MySQL to Pulsar
BladePipe supports data replication from MySQL to Pulsar. View supported migration, sync, verification, and connector capabilities.
| Function | Description |
|---|---|
Schema Migration | If the specified Topic after mapping does not exist in the Target, BladePipe will automatically create the Topic, allowing setting the number of partitions. |
Full Data Migration | Migrate data by sequentially scanning data in tables and writing it in batches to the message-oriented middleware. |
Incremental Data Sync | Sync of common DML like INSERT, UPDATE, DELETE is supported. |
Subscription Modification | Add, delete, or modify the subscribed tables with support for historical data migration. For more information, see Modify Subscription. |
Position Resetting | Reset positions by file position or timestamp. Allow re-consumption of incremental data logs in a past period or since a specific Binlog file and position. |
Metadata Retrieval | Retrieve the target metadata with filtering conditions or target primary keys set from the source table. |
Advanced Functions
| Function | Description |
|---|---|
Message Format | The following message formats are supported. See Message Format.
|
Topic Mapping Rules | By default, Topic is mapped the same as that in the Source. Also, BladePipe supports the mapping rules, namely, converting the text to lowercase, converting the text to uppercase, truncating the name by "_digit" suffix, truncating the name by "_digit" suffix. concatenate in the format of SCHEMA_TABLE (metadata mirroring)、concatenate in the format of SCHEMA_TABLE (converting metadata to uppercase)、concatenate in the format of SCHEMA_TABLE (converting metadata to lowercase). |
Table-level Topic | Create Topics corresponding to the tables in the Source, and the table partitions can be obtained automatically. |
Scheduled Full Data Migration | For more information, see Create Scheduled Full Data DataJob. |
Custom Code | For more information, see Custom Code Processing, Debug Custom Code and Logging in Custom Code. |
Data Filtering Conditions | Support data filtering using WHERE conditions, with SQL-92 as the SQL language. For more information, see Data Filtering. |
Setting Target Primary Key | Change the primary key to another field. |
Limits
| Limit | Description |
|---|---|
MySQL Storage Engine | Support InnoDB, MyISAM, AWS XEngine. Other storage engines have not been tested yet. |
MySQL Character Set | Support utf8, utf8mb4, latin1. Other encodings have not been tested yet. |
FAQ
What to do when access to schema in MySQL Source is denied?
Tip: MySQL source-related FAQ also applies to MySQL-based DataSources.
Prerequisites
| Prerequisite | Description |
|---|---|
Permissions for Account | |
Enabling Binlog | [mysqld] |
Parameters
| Parameter | Description |
|---|---|
parseBinlogParallel | Number of threads for parallel parsing of Binlog in Incremental DataJobs. |
parseBinlogBufferSize | Size of the circular buffer for parsing Binlog in Incremental DataJobs. |
maxTransactionSize | Maximum number of data rows per transaction. If exceeded, the transaction will be split and flushed in parts. |
limitThroughputMb | Limit the throughput of incremental Binlogs. |
extraDDL | Support synchronization of additional DDL, including PT, GHOST, ALI_DMS, and PT_GHOST. |
needJsonEscape | Escape special characters in JSON to be written to the target database. |
fullDataSqlConditionEnabled | Add filtering conditions in SQL during source data scanning. It only works in Full Data migration. |
srcTimeZone | Source time zone, e.g., +08:00, Asia/Shanghai, America/New_York, etc. |
Tips: To modify the general parameters, see General Parameters and Functions.
Prerequisites
| Prerequisite | Description |
|---|---|
Port Preparation | Allow the migration and sync node (Worker) to connect to the nodes of Pulsar. |
Parameters
| Parameter | Description |
|---|---|
schemaFormat | Message format. For more information, see Message Format. |
batchWriteSize | The maximum data size of a single message. If the size exceeds the limit, the message will be split. |
enableBatching | Enable Pulsar to push messages in batches. |
batchingMaxBytes | The maximum bytes of messages in a batch pushed by Pulsar (in bytes). |
connectionTimeoutMs | The timeout period for connection to Pulsar Client (in milliseconds). |
compressionType | Set compression algorithm. Support LZ4, ZLIB, ZSTD, SNAPPY. |
envelopSchemaInclude | When schemaFormat is set to DEBEZIUM_ENVELOP_JSON_FOR_MQ, it means whether the message body contains schema information. |
Tips: To modify the general parameters, see General Parameters and Functions.