Skip to main content

14 posts tagged with "Tutorials"

Tutorials

View All Tags

How to Load Data From MySQL to Iceberg in Real Time?

· 6 min read
John Li
John Li

As companies deal with more data than ever before, the need for real-time, scalable, and low-cost storage becomes critical. That's where Apache Iceberg shines. In this post, I’ll walk you through how to build a real-time data sync pipeline from MySQL to Iceberg using BladePipe—a tool that makes data migration ridiculously simple.

Let’s dive in.

Iceberg

What is Iceberg?

If you haven’t heard of Iceberg yet, it’s an open table format designed for large analytic datasets. It’s kind of like a smarter table format for your data lake—supporting schema evolution, hidden partitioning, ACID-like operations, and real-time data access.

It includes two key concepts:

  • Catalog: Think of this as metadata—the table names, columns, data types, etc.
  • Data Storage: Where the metadata and actual files are stored—like on S3 or HDFS.

Why Iceberg?

Iceberg is open and flexible. It defines clear standards for catalog, file formats, data storage, and data access. This makes it widely compatible with different tools and services.

  • Catalogs: AWS Glue, Hive, Nessie, JDBC, or custom REST catalogs.
  • File formats: Parquet, ORC, Avro, etc.
  • Storage options: AWS S3, Azure Blob, MinIO, HDFS, Posix FS, local file systems, and more.
  • Data access: Real-time data warehouses like StarRocks, Doris, ClickHouse, or batch/stream processing engines like Spark, Flink, and Hive can all read, process and analyze Iceberg data.

Besides its openness, Iceberg strikes a good balance between large-scale storage and near real-time support for inserts, updates, and deletes.

Here’s a quick comparison across several database types:

Database TypeRelational DBReal-time Data WarehouseTraditional Big DataData Lake
Data CapacityUp to a few TBs100+ TBsPB levelPB level
Real-time SupportMillisecond-level latency, 10K+ QPSSecond-to-minute latency, thousands QPSHour-to-day latency, very low QPSMinute-level latency, low QPS (batch write)
TransactionsACID compliantACID compliant or eventually consistentNoNo
Storage CostHighHigh or very highVery lowLow
OpennessLowMedium(storage-compute decoupling)HighVery hcigh

From this table, it’s clear that Iceberg offers low cost, massive storage, and strong compatibility with analytics tools—a good replacement for older big data systems.

And thanks to its open architecture, you can keep exploring new use cases for it.

Why BladePipe?

Setting up Iceberg sounds great—until you realize how much work it takes to actually migrate and sync data from your transactional database. That’s where BladePipe comes in.

Supported Catalogs and Storage

BladePipe currently supports 3 Iceberg catalogs and 2 storage backends:

  • AWS Glue + AWS S3
  • Nessie + MinIO / AWS S3
  • REST Catalog + MinIO / AWS S3

For a fully cloud-based setup: Use AWS RDS + EC2 to deploy BladePipe + AWS Glue + AWS S3.

For an on-premise setup: Use a self-hosted relational database + On-Premise deployment of BladePipe + Nessie or REST catalog + MinIO.

One-Stop Data Sync

Before data replication, there's often a lot of manual setup. BladePipe takes care of that for you—automatically handling schema mapping, historical data migration, and other preparation.

Even though Iceberg isn't a traditional database, BladePipe supports an automatic data sync process, including converting schemas, mapping data types, adapting field lengths, cleaning constraints, etc. Everything happens in BladePipe.

Procedures

In this post, we’ll use:

  • Source: MySQL (self-hosted)
  • Target: Iceberg backed by AWS Glue + S3
  • Sync Tool: BladePipe (Cloud)

Let’s go step-by-step.

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Add two sources – one MySQL, one Iceberg. For Iceberg, fill in the following (replace <...> with your values):
  • Address: Fill in the AWS Glue endpoint.

    glue.<aws_glue_region_code>.amazonaws.com
  • Version: Leave as default.

  • Description: Fill in meaningful words to help identify it.

  • Extra Info:

    • httpsEnabled: Enable it to set the value as true.
    • catalogName: Enter a meaningful name, such as glue_<biz_name>_catalog.
    • catalogType: Fill in GLUE.
    • catalogWarehouse: The place where metadata and files are stored, such as s3://<biz_name>_iceberg.
    • catalogProps:
    {
    "io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "s3.endpoint": "https://s3.<aws_s3_region_code>.amazonaws.com",
    "s3.access-key-id": "<aws_s3_iam_user_access_key>",
    "s3.secret-access-key": "<aws_s3_iam_user_secret_key>",
    "s3.path-style-access": "true",
    "client.region": "<aws_s3_region>",
    "client.credentials-provider.glue.access-key-id": "<aws_glue_iam_user_access_key>",
    "client.credentials-provider.glue.secret-access-key": "<aws_glue_iam_user_secret_key>",
    "client.credentials-provider": "com.amazonaws.glue.catalog.credentials.GlueAwsCredentialsProvider"
    }

Step 3: Create a DataJob

  1. Go to DataJob > Create DataJob.
  2. Select the source and target DataSources, and click Test Connection for both. Here's the recommended Iceberg structure configuration:
    {
    "format-version": "2",
    "parquet.compression": "snappy",
    "iceberg.write.format": "parquet",
    "write.metadata.delete-after-commit.enabled": "true",
    "write.metadata.previous-versions-max": "3",
    "write.update.mode": "merge-on-read",
    "write.delete.mode": "merge-on-read",
    "write.merge.mode": "merge-on-read",
    "write.distribution-mode": "hash",
    "write.object-storage.enabled": "true",
    "write.spark.accept-any-schema": "true"
    }

info

If the test hangs, try refreshing and selecting the data source again. It might be a network or configuration issue.

  1. Select Incremental for DataJob Type, together with the Full Data option.

info

Use at least the 1 GB or 2 GB DataJob specification. Smaller specification may hit memory issues with large batches.

  1. Select the tables to be replicated.

info

It’s best to stay under 1000 tables per DataJob.

  1. Select the columns to be replicated.

  2. Confirm the DataJob creation, and start to run the DataJob.

    mysql_to_iceberg_running

Step 4: Test & Verify

  1. Generate some insert/update/delete operations on MySQL mysql_to_iceberg_incre_data

  2. Stop data generation.

  3. Set up a pay-as-you-go Aliyun EMR for StarRocks, add the AWS Glue Iceberg catalog, and run queries.

  • In StarRocks, add the external catalog:

    CREATE EXTERNAL CATALOG glue_test
    PROPERTIES
    (
    "type" = "iceberg",
    "iceberg.catalog.type" = "glue",
    "aws.glue.use_instance_profile" = "false",
    "aws.glue.access_key" = "<aws_glue_iam_user_access_key>",
    "aws.glue.secret_key" = "<aws_glue_iam_user_secret_key>",
    "aws.glue.region" = "ap-southeast-1",
    "aws.s3.use_instance_profile" = "false",
    "aws.s3.access_key" = "<aws_s3_iam_user_access_key>",
    "aws.s3.secret_key" = "<aws_s3_iam_user_secret_key>",
    "aws.s3.region" = "ap-southeast-1"
    )

    set CATALOG glue_test;

    set global new_planner_optimize_timeout=30000;
  • MySQL row count mysql_data_count

  • Iceberg row count iceberg_data_count

Summary

Building a robust, real-time data pipeline from MySQL to Iceberg used to be a heavy lift. With tools like BladePipe, it becomes as easy as clicking through a setup wizard.

Whether you're modernizing your data platform or experimenting with lakehouse architectures, this combo gives you a low-cost, high-scale option to play with.

TDengine to MySQL in Real Time - A Complete Integration Guide

· 4 min read
John Li
John Li

Overview

TDengine is an open-source, high-performance, cloud-native time series database designed for IoT, IoV, IIoT, finance, IT operations and other scenarios. In the era of industry 4.0, time series databases are widely used in power, rail, smart manufacturing and other fields.

MySQL is an open-source relational database widely used around the world. It can efficiently handle large amounts of data and complex queries, and has strong stability and reliability.

This tutorial introduces how to sync data from TDengine to MySQL using BladePipe in minutes.

Use Cases

  • Data Backup and Archiving: Migrate TDengine data to MySQL as backup, or move legacy TDengine data to MySQL for long-term storage, improving data security and high availability.
  • Complex Query: MySQL supports complex SQL queries and transaction processing. Moving data from TDengine to MySQL is necessary when in-depth analysis or complex queries of time series data are needed.
  • Data Integration and Sharing: In an organization, multiple databases are usually used at the same time to satisfy different purposes. Replicating TDengine data to MySQL can facilitate the associated analysis of time series data with other business data.
  • Data Analysis: After synchronizing TDengine data to MySQL, the data can be further moved to other OLAPs or data warehouses through BladePipe for more complex data analysis and operations, thus maximizing data value and meeting diverse business needs.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. Select Incremental for DataJob Type, together with the Full Data option.

    info

    Don't enable Start Automatically now, as the values of some parameters may need to be modified later.

  4. Select the tables to be replicated.

  5. Select the columns to be replicated.

    info

    If you need to sync data from a super table, please click Operation > Filtering to set the filtering conditions for subtable subscription. All subtables are subscribed by default. For more details, please refer to TDengine Query Topic.

    If you need to replicate nanosecond timestamp, please manually create a table at the target instance. The Timestamp column at the source instance will be mapped to the BIGINT type column at the target instance.

  6. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source TDengine instance includes the following steps:

    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob creation
  7. Go to the DataJob Details page. Click Functions > Modify DataJob Params in the upper-right corner, and modify the values of the following parameters if needed.

    • srcTimezone (source parameter): It represents the time zone of the source data source. UTC by default. Please make sure that the time zone here is consistent with the exact time zone of the source data source.
    • supportTimestampToEpochNano (source parameter): Choose whether to enable Timestamp-Number conversion. False by default.
    • dstTimezone (target parameter): It represents the time zone of the target data source. Please make sure that the time zone here is consistent with the exact time zone of the target data source.

  1. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:
    • Full Data Migration: All existing data from the source tables will be fully migrated to the target database.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database with ultra-low latency.

Bring Oracle Data to Elasticsearch for Real-Time Search

· 5 min read
Barry
Barry

Overview

ClickHouse is an open-source column-oriented database management system. It's excellent performance in real-time data processing significantly enhances data analysis and business insights. Moving data from Oracle to ClickHouse can multiply the data power in decision making that would not be possible with Oracle alone.

This tutorial describes how to move data from Oracle to ClickHouse with BladePipe. By default, it uses ReplacingMergeTree as the ClickHouse table engine. The key features of the connection include:

  • Add _sign and _version fields in ReplacingMergeTree table.
  • Support for DDL synchronization.

Highlights

ReplacingMergeTree Optimization

In the early versions of BladePipe, when synchronizing data to ClickHouse's ReplacingMergeTree table, the following strategy was followed:

  • Insert and Update statements were converted into Insert statements.

  • Delete statements were separately processed using ALTER TABLE DELETE statements.

Though it was effective, the performance might be affected when there were a large number of Delete statements, leading to high latency.

In the latest version, BladePipe optimizes the synchronization logic, supporting _sign and _version fields in the ReplacingMergeTree table engine. All Insert, Update, and Delete statements are converted into Insert statements with version information.

Schema Migration

When migrating schemas from Oracle to ClickHouse, BladePipe uses ReplacingMergeTree as the table engine by default and automatically adds _sign and _version fields to the table:

CREATE TABLE console.worker_stats (
`id` Int64,
`gmt_create` DateTime,
`worker_id` Int64,
`cpu_stat` String,
`mem_stat` String,
`disk_stat` String,
`_sign` UInt8 DEFAULT 0,
`_version` UInt64 DEFAULT 0,
INDEX `_version_minmax_idx` (`_version`) TYPE minmax GRANULARITY 1
) ENGINE = ReplacingMergeTree(`_version`, `_sign`) ORDER BY `id`

Data Writing

DML Conversion

During data writing, BladePipe adopts the following DML conversion strategy:

  • Insert statements in Source:

    -- Insert new data, _sign value is set to 0
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 0, <new_version>);
  • Update statements in Source (converted into two Insert statements):

    -- Logically delete old data, _sign value is set to 1
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 1, <new_version>);

    -- Insert new data, _sign value is set to 0
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 0, <new_version>);
  • Delete statements in Source:

    -- Logically delete old data, _sign value is set to 1
    INSERT INTO <schema>.<table> (columns, _sign, _version) VALUES (..., 1, <new_version>);

Data Version

When writing data, BladePipe maintains version information for each table:

  • Version Initialization: During the first write, BladePipe retrieves the current table's latest version number by running:

    SELECT MAX(`_version`) FROM `console`.`worker_stats`;
  • Version Increment: Each time new data is written, BladePipe increments the version number based on the previously retrieved maximum version number, ensuring each write operation has a unique and incrementing version number.

To ensure data accuracy in queries, add the final keyword to filter out the rows that are not deleted :

SELECT `id`, `gmt_create`, `worker_id`, `cpu_stat`, `mem_stat`, `disk_stat`
FROM `console`.`worker_stats` final;

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. In the Advanced configuration of the target DataSource, choose the table engine as ReplacingMergeTree (or ReplicatedReplacingMergeTree).

  4. Select Incremental for DataJob Type, together with the Full Data option.

    info

    In the Specification settings, make sure that you select a specification of at least 1 GB.

    Allocating too little memory may result in Out of Memory (OOM) errors during DataJob execution.

  5. Select the tables to be replicated.

  6. Select the columns to be replicated.

  7. Confirm the DataJob creation.

  8. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to ClickHouse.
    • Full Data Migration: All existing data from the source tables will be fully migrated to ClickHouse.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database.

Step 4: Verify the Data

  1. Stop data write in the Source database and wait for ClickHouse to merge data.

    info

    It's hard to know when ClickHouse merges data automatically, so you can manually trigger a merging by running the optimize table xxx final command. Note that there is a chance that this manual merging may not always succeed.

    Alternatively, you can run the create view xxx_v as select * from xxx final command to create a view and perform queries on the view to ensure the data is fully merged.

  2. Create a Verification DataJob. Once the Verification DataJob is completed, review the results to confirm that the data in ClickHouse is the same as that in Oracle.

Sync Data from Oracle to Elasticsearch

· 4 min read
John Li
John Li

Overview

Oracle is a widely-used relational database to handle large volumes of structured data, offering high performance and comprehensive support for complex transactions. With its rich ecosystem and compatibility with various applications, Oracle is often at the core of many organizations' data infrastructure.

Elasticsearch is a highly scalable, open-source search and analytics engine designed to handle large volumes of data in real time. It is widely used for log analysis, real-time monitoring, and powering search functionalities in applications.

In this tutorial, we’ll explore how to efficiently move data from Oracle to Elasticsearch with BladePipe, to unlock real-time search capabilities and enhance data-driven decision-making.

Highlights

Sync Data Based on Oracle LogMiner

For real-time data sync from Oracle sources, BladePipe significantly improves its stability and efficiency by analyzing redo logs through LogMiner after multiple rounds of optimizations. These improvements have been validated in user production environments. Key features include:

  • Oracle RAC Support: The optimization is tailored for Oracle RAC scenarios, ensuring data integrity and consistency.
  • Standardized LogMiner Parsing: By default, LogMiner's standard method (ADD_FILE) is used to parse redo logs, and CONTINUOUS_MINE is a supplement (depending on the Oracle version).
  • Full Event Consumption Mode: BladePipe supports full event consumption, ensuring stability during data sync.
  • Large Transaction Handling: Large-scale change data is cached locally, making it capable of processing over a million changes in the source Oracle database.
  • Offset Resetting: In case of consumption errors, you can reset the timestamps or SCN (System Change Number) to reconsume data, enhancing fault tolerance.
  • Data Verification and Correction: BladePipe supports scheduled data verification and correction to ensure data consistency.

With these optimizations, BladePipe delivers more robust and reliable performance when moving data from Oracle sources, meeting various complex data sync requirements.

Create Elasticsearch Index with Mapping Automatically

BladePipe supports automatical conversion of the source database table structure to Elasticsearch indexes. During this process, you can personalize the column-level index and mapping. What you can personalize include:

  • Specifying whether each column needs to be indexed.
  • Setting the tokenizer (e.g., standard tokenizer) in Elasticsearch mappings for TEXT type columns.
  • Setting the number of index shards and replicas.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.
  2. Configure the source and target DataSources.
    1. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.
    2. Select the Incremental mode in Advanced setting under the source instance: LogMiner / materialized view.
    3. Select the time zone in Advanced setting under the target instance: +08:00 by default.

3. Select Incremental for DataJob Type, together with the Full Data option. 4. Select the tables to be replicated. 5. Select the columns to be replicated.

info

If you need to select specific columns for synchronization, please create the corresponding indexes in the Target in advance.

6. Confirm the DataJob creation.

info

The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

The DataJob creation with a source Oracle instance includes the following steps:

  • Schema migration
  • Initialization of table-level supplemental logging
  • Initialization of Oracle LogMiner offset
  • Allocation of DataJobs to BladePipe Workers
  • Creation of DataJob FSM (Finite State Machine)
  • Completion of DataJob creation
  1. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to the target instance. If the index with the same name exists in the target instance, the schema won't be migrated.
    • Full Data Migration: All existing data from the source tables will be fully migrated to the target database.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database with ultra-low latency.

Sync Data from Redis to Redis - A No-code Intuitive Way

· 3 min read
John Li
John Li

Overview

Redis is an open-source, in-memory database for key-value pairs and data structure store. It is commonly used for caching, real-time data processing, and distributed locking. It supports persistence, master-slave replication, and high-availability, suitable for use cases requiring high-concurrency and low-latency.

In this tutorial, we depicts a no-code intuitive way to sync data from Redis to Redis using BladePipe. With BladePipe, even a non-developer can finish Redis data replication in a few clicks.

Principle

BladePipe realizes Redis-Redis data sync based on Redis PSYNC command.

  1. BladePipe establishes a Socket connection with a source Redis Master.
  2. BladePipe sends an Auth command (if any).
  3. BladePipe sends PSYNC commands to Redis Master, disguised as a Redis Slave node.
  4. The Redis Master node continuously pushes binary streams to the Redis Slave node disguised by BladePipe.
  5. BladePipe parses the binary streams into a Redis command and sends it to the target Redis for execution.

redis_redis_sync_1

Limitation

Cloud-hosted Redis data sync is not supported yet, because cloud-hosted Redis adopts the forward proxy, making the PSYNC command invalid.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. In Advanced setting below the source instance, select Enable DB Mapping: yes / no.

    info

    If you enable DB mapping, please make sure that the number of DBs in the source instance and the target instance is the same.

  4. Select Incremental for DataJob Type, together with the Full Data option.

  5. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source Redis instance includes the following steps:

    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob creation
  6. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Full Data Migration: All existing data from the source instance will be fully migrated to the target instance.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database with ultra-low latency.

Load Data from MySQL to StarRocks in Minutes

· 4 min read
Zoe
Zoe

What is StarRocks?

StarRocks is an open-source, blazing-fast Massively Parallel Processing (MPP) database. Thanks to its special yet simple design of architecture and outstanding performance in data queries, real-time data analysis becomes easier than ever before for enterprises.

Known for its scalability, speed and high performance, StarRocks is a brilliant and cost-effective choice for many data-driven organizations. It is widely used for OLAP multi-dimensional analytics, real-time analytics, high-concurrency analytics, customized reporting, ad-hoc queries, and unified analytics in finance, e-commerce and many other industries.

Features

Some of the fantastic features of StarRocks include:

  • The MPP framework enables parallel execution, greatly accelerating the data query.
  • The columnar storage lowers the data read I/Os, bringing a faster query speed.
  • The fully vectorized execution engine enlarges the power of columnar storage. With this engine, the CPU processing power is fully used, and the overall performance of operator is increased by 3 to 15 times.
  • The storage-compute separation architecture provides great scalibility and flexibility while maintaining the same functionalities as the storage-compute coupled mode.

Data Integration to StarRocks

Before enjoying the unparalleled data analysis offered by StarRocks, an important step is to integrate data from the other data sources to it. How to move massive data to StarRocks as easy as possible? BladePipe provides a sound solution.

BladePipe loads data via StarRocks Stream Load. The existing data and data changes in the Source instance are converted into byte streams and transferred via HTTP for bulk write to StarRocks.

With the Stream Load approach, all operations on the StarRocks instance are performed using INSERT statements. BladePipe automatically converts INSERT/UPDATE/DELETE operations into INSERT statements and fills in the __op value (delete identifier), enabling StarRocks to merge data automatically.

Here's a step-by-step guidance.

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select StarRocks as the Type, and fill in the setup form.
    • Client Address:The port StarRocks provided to MySQL Client. BladePipe queries the metadata in databases via it.
    • Account: The user name of the StarRocks database. The INSERT permission is required to write data to StarRocks. If the user doesn't have the INSERT permission, please grant the permission with GRANT as a reference.
    • Http Address:It is used to receive the request from BladePipe to write data to StarRocks.
  4. Click Test Connection. After successful connection, click Add DataSource to add the DataSource.
  5. Add a MySQL DataSource following the above steps.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.
  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.
  3. Select Incremental for DataJob Type, together with the Full Data option.
  4. Select the tables to be replicated. Note that the target StarRocks tables automatically created after Schema Migration have primary keys, so source tables without primary keys are not supported currently.
  5. Select the columns to be replicated.
  6. Confirm the DataJob creation. Now the DataJob starts. BladePipe will automatically run the following DataTasks:
    • Schema Migration: The schemas of the source tables will be migrated to the target instance.
    • Full Data: All existing data of the source tables will be fully migrated to the target instance.
    • Incremental: Ongoing data changes will be continuously synchronized to the target instance (with latency less than a minute).

Move Data from MongoDB to MongoDB in 3 Steps

· 4 min read
Zoe
Zoe

Overview

MongoDB is a widely used document-oriented database known for its schema flexibility and strong scalability, making it suitable for a variety of use cases.

This tutorial delves into how to quickly create a stable and efficient data pipeline from MongoDB to MongoDB using BladePipe. In this tutorial, MongoDB instances are configured as replica sets.

Highlights

Sync Data from MongoDB

Incremental data in the source MongoDB can be obtained from the oplog.rs collection in the local database (replica sets are required).

An event includes the following subdocuments (there are slight differences in different MongoDB versions). BladePipe delivers the data changes by parsing event records:

Subdocument NameDescription
opOperation type. BladePipe supports operations including c (control operation), i (INSERT), u (UPDATE), d (DELETE).
nsNamespace in the format of dbName.collectionName. If collectionName is $cmd, it indicates an operation on the corresponding database.
tsTimestamp of the operation, in seconds.
oChanged data. It shows the mirroring of data after INSERT/UPDATE operations, and the mirroring of data before DELETE operations. Note that this subdocument in MongoDB 4.x is different from that in other versions.
o2Present only in UPDATE events. It can be regarded as the primary key or identifier for locating data.

Now BladePipe supports data movement from shards and replica sets of MongoDB. The supported MongoDB version is 7.x and below.

Supported Data Types in MongoDB

In a full data migration from MongoDB or a data synchronization by consuming oplog, data type conversion is crucial for data processing with custom code and data write to target data sources. For this reason, BladePipe is iteratively expanding its support for MongoDB data types.

The supported data types in full data reading from MongoDB include: null, ObjectId, Date, Number, String.

The supported data types in incremental data synchronization from MongoDB oplog include: ObjectId, Date, Number, String, Integer, Long, BigInteger, Double, BigDecimal.

The supported data types are expanding along with the requests from the increasing users.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource, and add 2 DataSources.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. Select Incremental for DataJob Type, together with the Full Data option.

  4. Select the collections to be replicated.

  5. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source MongoDB instance includes the following steps:

    • Schema Migration
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  6. Wait for the DataJob to automatically run.

info

Once the DataJob is created and started, BladePipe will automatically run the following DataTasks:

  • Schema Migration: The schemas of the source collections will be migrated to the target instance.
  • Full Data Migration: All existing data from the selected source collections will be fully migrated to the target instance.
  • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target instance.

How to Stream Data from Kafka to Kafka

· 3 min read
John Li
John Li

Overview

Apache Kafka is a stream-processing platform most known for its great performance, high throughput and low latency. Its persistence layer is essentially a "massive publish/subscribe message queue following a distributed transaction logging architecture," making it valuable as an enterprise-class infrastructure for processing streaming data. Therefore, the data transmission from Kafka to Kafka is of great importance for many enterprises.

This tutorial introduces how to use BladePipe to create a Kafka-Kafka real-time data pipeline.

Highlights

Pushing Messages

After a DataJob is created, BladePipe automatically creates a consumer group and subscribes to the topics to be synchronized. Then it pulls the messages from the source Kafka and pushes them to the target Kafka.

Kafka Heartbeat Mechanism

When no messages were produced at the Source Kafka, BladePipe was unable to accurately calculate the message latency.

To address the problem, BladePipe monitors the Kafka heartbeat. After Kafka heartbeat is enabled, BladePipe will monitor the consumer offsets of all partitions. If the differences between the latest offset and the current offset of all partitions are all smaller than the tolerant offset interval (configured by parameter dbHeartbeatToleranceStep), a heartbeat record containing the current system time will be generated. Upon consuming this record, BladePipe will calculate the latency based on the time included in it.

Procedure

Step 1: Grant Permissions

Please refer to Permissions Required for Kafka to grant the required permissions to a user for data movement using BladePipe.

Step 2: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 3: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource, and add 2 DataSources. image.png

Step 4: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources and click Test Connection to ensure the connection to the source and target DataSources are both successful. create_job_1st

  3. Select the message format.

    info

    If there is no specific message format, please select Raw Message Format.

  4. Select Incremental for DataJob Type. create_job_2nd

  5. Select the Topic to be synchronized. create_job_3rd

  6. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source Kafka instance includes the following steps:

    • Schema Migration
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  7. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The topics will be created automatically in the target instance if they don't exist already.
    • Incremental Data Synchronization: Ongoing data changes will be continuously synchronized to the target instance.

    jog_watching.png

How to Move Data from PostgreSQL to PostgreSQL in Minutes

· 4 min read
Zoe
Zoe

Overview

PostgreSQL is a widely-used database system with over 35 years of development. It not only has the capabilities of a standard relational database, but also excels in executing complex SQL queries.

Users often utilize PostgreSQL for both online transaction processing (OLTP) and some data analysis tasks. This makes it important to move data from PostgreSQL to PostgreSQL.

This tutorial introduces how to sync data from PostgreSQL to PostgreSQL using BladePipe in minutes.

Highlights

PostgreSQL Logical Replication

BladePipe tracks the incremental data through the logical replication in PostgreSQL.

Publications are associated with all the tables involved in a DataJob. If the subscriptions of the DataJob are modified, the tables in the Publication are automatically changed accordingly.

DDL Sync with Triggers

DDL synchronization is essential for online database disaster recovery and other use cases, but PostgreSQL logical replication does not involve the replication of DDLs.

To address this issue, we adopt a widely-used solution, that is to use triggers to capture DDL operations and automatically write them into a regular table cc_pg_ddl_capture_tab. BladePipe can subscribe to this table to obtain DDL operations.

This mechanism is consistent with that of regular incremental DML sync, thus ensuring the correct order of DDLs and related DML events of the table.

pg_ddl_capture

Bidirectional Sync Loop Prevention

When using online databases, active geo-redundancy is usually one of the mandatory requirements. For data movement tools, it is crucial to prevent circular data replication during data synchronization.

To avoid loops in the bidirectional data synchronization between PostgreSQL databases, we mark the DML operations of the same transaction.

An additional decycle table is created. When BladePipe writes data to the target database, the DML events of the same transaction are recorded in the decycle table.

If BladePipe retrieves the event from the decycle table in the PostgreSQL instance, it ignores all operations of the current transaction, thus preventing circular data replication.

pg_loop_sync

Procedure

Step 1: Modify PostgreSQL wal_level

  1. Please refer to Permissions Required for PostgreSQL to create a user and grant the necessary permissions.

  2. Set PostgreSQL's wal_level to logical.

    info

    For self-managed databases, you can modify the postgresql.conf file to set wal_level=logical and wal_log_hints=on.

  3. Configure network permissions for the account.

    info

    For self-managed databases, you can modify the pg_hba.conf file and add the following configurations:

    • host replication <sync_user> <CIDR_address> md5
    • host <sync_database> <sync_user> <CIDR_address> md5
    • host postgres <sync_user> <CIDR_address> md5
  4. Restart PostgreSQL to apply the changes.

Step 2: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 3: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 4: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. Select Incremental for DataJob Type, together with the Full Data option.

    info

    If you select Sync DDL, BladePipe will automatically create the corresponding DDL capture triggers and events, which requires privileged permissions.

  4. Select the tables to be replicated.

  5. Select the columns to be replicated.

    info

    If you need to select specific columns for synchronization, you can create the corresponding tables in the Target in advance.

  6. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source PostgreSQL instance includes the following steps:

    • Schema migration
    • Initialization of DDL capture triggers and tables
    • Initialization of offset for PostgreSQL incremental data replication
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob creation
  7. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to the target database.
    • Full Data Migration: All existing data from the source tables will be fully migrated to the target database.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database with ultra-low latency.

Move Data from Hana to PostgreSQL Fast and Easily

· 5 min read
Barry
Barry

Overview

SAP Hana is a column-oriented in-memory database. It stores and retrieves large volume of data and handles complex query processing. Besides, it performs advanced analytics, providing business insights via real-time data analysis.

PostgreSQL is a popular open-source relational database. It is known for its reliability, scalability and flexibility. Many organizations use it as a backend database for applications.

To move data from Hana to PostgreSQL, speed is the issue that many users care about. BladePipe lets you build a data pipeline from Hana to PostgreSQL in just minutes.

Highlights

Table-level CDC Tables

To sync incremental data from a Hana instance, BladePipe designed a single change data capture (CDC) table mode in the beginning, that is, the incremental data (generated by INSERT, UPDATE and DELETE operations) of all subscribed tables is written to the same CDC table through triggers. This design was intended to simplify the architecture and process, but it also introduced some problems.

  • Slow trigger performance:When using a single CDC table, BladePipe concatenates the field values of the subscribed tables into a JSON string. Though it is a unified approach, the trigger becomes more complex. When the number of fields exceeds 300, the efficiency of the trigger is significantly reduced, which affects the synchronization performance.

  • Incremental data backlog:When table A has much more incremental data than table B, if all the data is written to a single CDC table, the data in table B will not be processed in time, resulting in latency due to a backlog of data in table B.

Then, BladePipe optimized the single CDC table mode. It designed a table-level CDC table mode, where a CDC table is created for each source table. In a table-level CDC table, only several offset fields are added to the schema based on the original table schema for incremental data synchronization.

Original Table:

CREATE
COLUMN TABLE "SYSTEM"."TEST" (
"TEST1" INTEGER NOT NULL ,
"TEST2" INTEGER NOT NULL ,
"TEST3" INTEGER,
CONSTRAINT "TEST_KEY" PRIMARY KEY ("TEST1", "TEST2")
)

CDC Table:

CREATE
COLUMN TABLE "SYSTEM"."SYSTEM_TEST_CDC_TABLE" (
"TEST1" INTEGER,
"TEST2" INTEGER,
"TEST3" INTEGER,
"__$DATA_ID" BIGINT NOT NULL ,
"__$TRIGGER_ID" INTEGER NOT NULL ,
"__$TRANSACTION_ID" BIGINT NOT NULL ,
"__$CREATE_TIME" TIMESTAMP,
"__$OPERATION" INTEGER NOT NULL
);
-- other index

Trigger (INSERT):

CREATE TRIGGER "SYSTEM"."BLADEPIPE_ON_I_TEST_TRIGGER_TEST"
AFTER INSERT
ON "SYSTEM"."TEST"
REFERENCING NEW ROW NEW FOR EACH ROW
BEGIN
DECLARE
EXIT HANDLER FOR SQLEXCEPTION
BEGIN
END;
IF
1=1 THEN
INSERT INTO "SYSTEM"."SYSTEM_TEST_CDC_TABLE" ("__$DATA_ID", "__$TRIGGER_ID", "__$TRANSACTION_ID", "__$CREATE_TIME", "__$OPERATION", "TEST1", "TEST2", "TEST3")
VALUES(
"SYSTEM"."CC_TRIGGER_SEQ".NEXTVAL,
433,
CURRENT_UPDATE_TRANSACTION(),
CURRENT_UTCTIMESTAMP,
2,
:NEW."TEST1" ,
:NEW."TEST2" ,
:NEW."TEST3"
);
END IF;
END;

The table-level CDC table mode has several benefits:

  • Table-level CDC tables are more self-contained, making it easier to make multiple subscriptions.
  • The trigger only needs to execute the INSERT statements, so it can maintain high performance even for tables with many fields.
  • When scanning and consuming CDC data, no additional processing is required, and data consumption is simpler.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.
  2. Configure the source and target DataSources:
    1. Select the source and target DataSources, and click Test Connection.
    2. In the Advanced configuration of the source DataSource, select the CDC table mode: Single CDC table / Table-level CDC table.
  3. Select Incremental for DataJob Type, together with the Full Data option.
  4. Select the tables to be replicated.
  5. Select the fields to be replicated.
info

If you need to select specific fields for synchronization, you can first create the schema on the target PostgreSQL instance. This allows you to define the schemas and fields that you want to synchronize.

  1. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source Hana instance includes the following steps:

    • Schema Migration
    • Initialization of Hana CDC Tables and Triggers
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  2. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to the target instance. If there is a target table with the same name as that in the source, then this table schema won't be migrated.
    • Full Data Migration: All existing data of the source tables will be fully migrated to the target instance.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target instance with ultra-low latency.

How to Move Data From MySQL to Redis in Real Time

· 3 min read
Barry
Barry

Overview

Redis is an open-source, in-memory, non-relational data store known for its high performance and flexibility. It is widely used in a range of cases, such as real-time analysis, application cache, and session management. This makes it important to integrate data to Redis.

This tutorial delves into how to use BladePipe to move data from MySQL to Redis, including the following features:

  • Support a single-node Redis instance, master/standby Redis instances, and a sharded cluster instance.
  • Allow setting a cache expiration time when writing data to a Redis instance.

Highlights

Automatic Adaptation to Sharded Clusters

There are differences in the way of writing data to Redis sharded and non-sharded clusters.

BladePipe automatically identifies the cluster sharding of Redis by obtaining Redis parameters, and adjusts the data write method to run the Incremental DataJob.

Support for Cache Expiration

It is allowed to set the cache expiration time when writing data to a Redis instance.

When creating a BladePipe DataJob, you can optionally set the expiration time (in seconds). The configuration takes effects automatically when a DataJob is running.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form respectively.
info

If the Redis instance is a cluster, please fill in all nodes or all master nodes and separate them with commas.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.
  2. Select the source and target DataSources. Set the cache expiration time (in seconds) in Advanced configuration of the target DataSource. The number <=0 means the cache won't expire.
  3. Select Incremental for DataJob Type, together with the Full Data option.
  4. Select the tables to be replicated.
info

Because the keys in Redis are composed of the primary keys of the source tables, it is not recommended to select the tables without a primary key.

  1. Select the columns to be replicated. Filter the data if needed.

  2. Confirm the creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source MySQL instance includes the following steps:

    • Schema Migration
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  3. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to the target instance.
    • Full Data Migration: All existing data from the source tables will be fully migrated to the target instance.
    • Incremental Data Synchronization: Ongoing data changes will be continuously synchronized to the target instance.

FAQ

What should I do after a Redis master/standby switchover?

BladePipe writes data with JedisCluster, which automatically senses a master/standby switchover.

What should I do if the nodes in Redis are changed?

You can manually modify the node information of the DataJob configuration and restart the DataJob.

Accelerate GenAI - Stream Data from MySQL to Kafka

· 3 min read
John Li
John Li

Overview

In the age of AI, Apache Kafka is becoming a pivotal force due to its high-performance in real-time data streaming and processing. Many organizations are seeking to integrate data to Kafka for an enhanced efficiency and business agility. In this case, a powerful tool for data movement is of great importance. BladePipe is one of the excellent choices.

This tutorial describes how to move data from MySQL to Kafka with BladePipe, using the CloudCanal Json Format by default. The key features of the pipeline include:

  • Support multiple message formats.
  • Support DDL synchronization. You can configure the topic to which the DDL operations are written.
  • Support automatic topic creation.

Highlights

Automatic Topic Creation

The topics can be automatically created in the target Kafka during the DataJob creation. Besides, you can configure the number of partitions based on your needs.

Batch Writing of Data

In BladePipe, the same type of operations on the same table are merged into a single message, enabling batch writing of data and reducing bandwidth usage. Thus, the data processing efficiency is significantly increased.

image.png

Resumable DataJob

Resumability is essential for the synchronization of large tables with billions of records.

By regularly recording the offsets, BladePipe allows resuming Full Data and Incremental DataTasks from the last offset after they are restarted, thus minimizing the impact of unexpected pauses on progress.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource.
  3. Select the source and target DataSource type, and fill out the setup form. image.png

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

    In the Advanced configuration of the target DataSource, choose CloudCanal Json Format for Message Format. image.png

  3. Select Incremental for DataJob Type, together with the Full Data option. image.png

  4. Select the tables and columns to be replicated. When selecting the columns, you can configure the number of partitions in the target topics. image.png

  5. Confirm DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source MySQL instance includes the following steps:

    • Schema Migration
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  6. Now the DataJob is created and started. BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to the target database.
    • Full Data Migration: All existing data from the source tables will be fully migrated to the target database.
    • Incremental Data Synchronization: Ongoing data changes will be continuously synchronized to the target instance.

FAQ

What other source DataSources does BladePipe support?

Currently, you can create a connection from MySQL, Oracle, SQL Server, PostgreSQL and MongoDB to Kafka. If you have any other requests, please give us feedbacks in the community.

Sync Data from Elasticsearch to Elasticsearch

· 4 min read
John Li
John Li

Overview

Elasticsearch is a popular search engine that forms part of the modern data stack alongside relational databases, caching, real-time data warehouses, and message-oriented middleware.

While writing data to Elasticsearch is relatively straightforward, real-time data synchronization can be more challenging.

This article describes how to migrate and sync data from Elasticsearch to Elasticsearch using BladePipe and the Elasticsearch incremental data capture plugin.

Highlights

Elasticsearch Plugin

Elasticsearch does not explicitly provide a method for real-time change data capture. However, its plugin API IndexingOperationListener can track INDEX and DELETE events. The INDEX event includes INSERT or UPDATE operations, while the DELETE event refers to traditional DELETE operations.

Once the mechanism for capturing incremental data is established, the next challenge is how to make this data available in downstream tools.

We use a dedicated index, cc_es_trigger_idx, as a container for incremental data.

This approach has several benefits:

  • No dependency on third-party components (e.g., message-oriented middleware).
  • Easy management of Elasticsearch indices.
  • Consistency with the incremental data capture method of other BladePipe data sources, allowing for code reuse.

The structure of the cc_es_trigger_idx index is as follows, where row_data holds the data after the INDEX operations, and pk stores the document _id.

{
"mappings": {
"_doc": {
"properties": {
"create_time": {
"type": "date",
"format": "yyyy-MM-dd'T'HH:mm:ssSSS"
},
"event_type": {
"type": "text",
"analyzer": "standard"
},
"idx_name": {
"type": "text",
"analyzer": "standard"
},
"pk": {
"type": "text",
"analyzer": "standard"
},
"row_data": {
"type": "text",
"index": false
},
"scn": {
"type": "long"
}
}
}
}
}

Trigger Data Scanning

As for the incremental data generated by using the Elasticsearch plugin, simply perform batch scanning in the order of the scn field in the cc_es_trigger_idx index to consume the data.

The coding style for data consumption is consistent with that used for the SAP Hana as a Source.

Open-source Plugin

Elasticsearch strictly identifies third-party packages that plugins depend on. If there are conflicts or version mismatches with Elasticsearch's own dependencies, the plugin cannot be loaded. Therefore, the plugin must be compatible with the exact version of Elasticsearch, including the minor version.

Given the impracticality of releasing numerous pre-compiled packages and to encourage widespread use, we place the open-source plugin on GitHub.

Procedure

Step 1: Install the Plugin on Source Elasticsearch

Follow the instructions in Preparation for Elasticsearch CDC to install the incremental data capture plugin.

Step 2: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 3: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource, and add 2 DataSources.

Step 4: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. Select Incremental for DataJob Type, together with the Full Data option.

    info

    In the Specification settings, make sure that you select a specification of at least 1 GB.

    Allocating too little memory may result in Out of Memory (OOM) errors during DataJob execution.

  4. Select the indices to be replicated.

  5. Select the fields to be replicated.

    info

    If you need to select specific fields for synchronization, you can first create the index on the target Elasticsearch instance. This allows you to define the schemas and fields that you want to synchronize.

  6. Confirm the DataJob creation.

    info

    The DataJob creation process involves several steps. Click Sync Settings > ConsoleJob, find the DataJob creation record, and click Details to view it.

    The DataJob creation with a source Elasticsearch instance includes the following steps:

    • Schema Migration
    • Initialization of Elasticsearch Triggers and Offsets
    • Allocation of DataJobs to BladePipe Workers
    • Creation of DataJob FSM (Finite State Machine)
    • Completion of DataJob Creation
  7. Wait for the DataJob to automatically run.

    info

    Once the DataJob is created and started, BladePipe will automatically run the following DataTasks:

    • Schema Migration: The index mapping definition in the source Elasticsearch instance will be migrated to the Target. If an index with the same name already exists in the Target, it will be ignored.
    • Full Data Migration: All existing data in the Source will be fully migrated to the Target.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target instance.

Move Data from Oracle to ClickHouse in Minutes

· 4 min read
John Li
John Li

Overview

This article explains how to move data from relational databases to ClickHouse with BladePipe. By default, it uses ReplacingMergeTree as the ClickHouse table engine. The key features of the connection include:

  • Add _version and _sign fields to ensure accurate merging in ClickHouse.
  • All DML statements are written as INSERT statements, ensuring good synchronization performance.
  • Support for DDL synchronization.

Highlights

Schema Migration

When performing schema migration with ClickHouse as the target database, the default table engine selected is ReplacingMergeTree. If replication is involved, ReplicatedReplacingMergeTree is automatically chosen.

The sort key for ClickHouse tables defaults to the primary key fields of the source table. If the source table has no primary key, tuple() is used as sort key.

Additional fields _version and _sign are added as merge fields. During synchronization, BladePipe automatically fill in these fields based on the DML statements to ensure data consistency between the source and target.

# e.g.,
CREATE TABLE console.worker_stats
(
`id` Int64,
`gmt_create` DateTime,
`worker_id` Int64,
`cpu_stat` String,
`mem_stat` String,
`disk_stat` String,
`_sign` UInt8 DEFAULT 0,
`_version` UInt64 DEFAULT 0,
INDEX `_version_minmax_idx` `_version` TYPE minmax GRANULARITY 1
)
ENGINE = ReplacingMergeTree(`_version`,`_sign`)
ORDER BY id
SETTINGS index_granularity = 8192

Data Writing

In both Full Data migration and Incremental data synchronization, all DML statements are converted into INSERTs, which are written in standard batches.

  • The _version field values increment according to the order of data changes.
  • The _sign field values are set to 0 for Insert and Update statements, and 1 for Delete statements.

The two additional fields comply with the ClickHouse ReplacingMergeTree definition.

Procedure

Step 1: Install BladePipe

Follow the instructions in Install Worker (Docker) or Install Worker (Binary) to download and install a BladePipe Worker.

Step 2: Add DataSources

  1. Log in to the BladePipe Cloud.
  2. Click DataSource > Add DataSource, and add 2 DataSources.

Step 3: Create a DataJob

  1. Click DataJob > Create DataJob.

  2. Select the source and target DataSources, and click Test Connection to ensure the connection to the source and target DataSources are both successful.

  3. In the Advanced configuration of the target DataSource, choose the table engine as ReplacingMergeTree (or ReplicatedReplacingMergeTree).

  4. Select Incremental for DataJob Type, together with the Full Data option.

    info

    In the Specification settings, make sure that you select a specification of at least 1 GB.

    Allocating too little memory may result in Out of Memory (OOM) errors during DataJob execution.

  5. Select the tables to be replicated.

  6. Select the columns to be replicated.

  7. Confirm the DataJob creation.

  8. Wait for the DataJob to automatically run.

    info

    Once the DataJob is created and started, BladePipe will automatically run the following DataTasks:

    • Schema Migration: The schemas of the source tables will be migrated to ClickHouse.
    • Full Data Migration: All existing data of the source tables will be fully migrated to ClickHouse.
    • Incremental Synchronization: Ongoing data changes will be continuously synchronized to the target database.

Step 4: Verify the Data

  1. Stop data write in the source database and wait for ClickHouse to merge data.

    info

    Due to the unpredictable timing of ClickHouse's automatic merging, you can manually trigger a merging by running the OPTIMIZE TABLE xxx FINAL; command. Note that there is a chance that this manual merging may not always succeed.

    Alternatively, you can run the CREATE VIEW xxx_v AS SELECT * FROM xxx FINAL; command to create a view and perform queries on the view to ensure the data is fully merged.

  2. Create a Verification DataJob. Once the Verification DataJob is completed, review the results to confirm that the data in ClickHouse are the same as the data in MySQL.