BladePipe 1.7.0: Stronger alerts, Broader DB support, Faster KingbaseES scanning.
Skip to main content

MongoDB to Kafka

BladePipe supports data replication from MongoDB to Kafka. View supported migration, sync, verification, and connector capabilities.

Target DataSource:

Connection

Basic Functions

FunctionDescription
Full Data Migration

Migrate data by sequentially scanning data in tables and writing it in batches to the target database. Supported _id types: ObjectId, Long, Integer.

Incremental Data Sync

Sync of INSERT, UPDATE, DELETE is supported.

Subscription Modification

Add, delete, or modify the subscribed tables with support for historical data migration. For more information, see Modify Subscription.

Position Resetting

Reset positions by timestamp to consume the oplog in a past period again.

Supported Deployment

Support master-slave, replica set, sharded cluster.

Advanced Functions

FunctionDescription
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format
Table-Level Topic

Create topics at the source table level. Automatically handles table partitioning.

DDL Dedicated Topic

Supports specifying a topic for DDL. If not specified, DDL events are placed in partition 0 of the corresponding table topic.

Limits

LimitDescription
Oplog Size and Retention Settings

By default, the value of replication.oplogSizeMB or storage.oplogMinRetentionHours in MongoDB is too small. If data synchronization latency is significant, unconsumed oplogs may be removed. In this case, it is necessary to increase these parameters.

Parameter Configuration for MongoDB Master-Slave Architecture

For MongoDB master-slave architecture, set the Source parameter oplogCollection to oplog.$main.

ChangeStream Mode

MongoDB 3.6 and above support changeStream for capturing incremental data changes. Set the Source parameter captureMode to CHANGE_STREAM. For sharded clusters, use the MongoDB connection string for synchronization.

Oplog Mode

When using oplog mode for data synchronization from a MongoDB instance, ensure the access to the local database.

Target Side Must Pre-Create Topic

MongoDB to Kafka / AutoMQ does not support automatic topic creation.


Source

Prerequisites

PrerequisiteDescription
Permissions for Account

See Permissions Required for MongoDB.

Parameters

ParameterDescription
captureMode

Configure the MongoDB incremental data sync mode, supporting OP_LOG and CHANGE_STREAM modes.

changeStreamBatchSize

Set the maximum number of change events per batch for MongoDB Change Stream.

oplogCollection

Specify the collection name for MongoDB oplog. The default name is oplog.rs.

timezone

Source time zone (the default time zone is UTC).

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

Parameters

ParameterDescription
schemaFormat

Message format. For more information, see Message Format.

batchWriteSize

The maximum data size of a single message. If the size exceeds the limit, the message will be split.

defaultTopic

Messages that cannot find a corresponding topic are sent to this topic (such as adding a new table)

ddlTopic

A topic specifically used to receive DDL events. If it is empty, the DDL events will be sent to the 0th partition of the corresponding topic.

compressionType

Kafka compression.type parameter to set compression algorithm. Support GZIP, SNAPPY, LZ4, ZSTD.

batchSize

Kafka batch.size parameter.

acks

Kafka acks parameter. By default, it is all.

maxRequestBytes

Kafka max.request.size parameter.

lingerMs

Kafka linger.ms parameter. By default, it is 1.

envelopSchemaInclude

When schemaFormat is set to DEBEZIUM_ENVELOP_JSON_FOR_MQ, it means whether the message body contains schema information.

customClientProps

Custom properties passed to the Kafka client in JSON format. The key is the parameter name and the value is the parameter value. This setting takes the highest priority. For example: AWS IAM Access Control

Tips: To modify the general parameters, see General Parameters and Functions.