BladePipe 1.7.0: Stronger alerts, Broader DB support, Faster KingbaseES scanning.
Skip to main content

Kafka to Doris

BladePipe supports data replication from Kafka to Doris. View supported migration, sync, verification, and connector capabilities.

Target DataSource:

Connection

Basic Functions

FunctionDescription
Incremental Data Sync

Allow subscribing to messages from the source Topic and transforming them into DML operations, namely, INSERT, UPDATE, DELETE.

Subscription Modification

Add, delete, or modify the subscribed topics. For more information, see Modify Subscription.

Position Resetting

Reset positions by timestamp to consume the data in a past period again.

Advanced Functions

FunctionDescription
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format
Removal of Target Data before Full Data Migration

Remove the existing data in the Target before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations.

Recreating Target Table

Recreate target tables before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations.

Stream Load

Use Stream Load to write data to Doris/SelectDB BE. By default, batch write is adopted, with dynamic adjustment of data flush interval and batch size.

Handling of Zero Value for Time

Allow setting zero value for time to different data types to prevent errors when writing to the Target.

Limits

LimitDescription
Creating Tables in the Target in Advance

Only support automatic Topic creation for messages.

Raw Message Format

Only support raw message replication from Kafka to Kafka, and Raw Message Format needs to be selected at both the Source and the Target.

Target Table Type

Only support Unique key model(Unique).

Source Table Type

Migration and sync of tables without primary keys are not supported.

Data Type

Do not support binary data such as BINARY, BLOB.

Incremental Data Write Conflict Resolution Rule

Using Stream Load method, the primary key is used for full row replacement.


Source

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

Parameters

ParameterDescription
schemaFormat

MQ Message format. For more information, see Message Format.

consumerGroupId

Kafka consumer group ID.

consumeParallel

Degree of consuming Kafka topics in parallel.

sessionTimeoutMs

Kafka session timeout in milliseconds.

maxPollRecords

Maximum number of messages fetched in one poll from Kafka.

dbHeartbeatIntervalSec

Interval for initiating heartbeat on the source database.

dbHeartbeatToleranceStep

The threshold of gap between the latest offset and the current offset. If the actual gap is bigger than the threshold, BladePipe won‘t send heartbeat message.

customClientProps

Custom properties passed to the Kafka client in JSON format. The key is the parameter name and the value is the parameter value. This setting takes the highest priority. For example: AWS IAM Access Control

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Permissions for Account

SELECT and DDL permissions (optional)

Port Preparation

Allow the migration and sync node (Worker) to connect to the Doris/SelectDB FE QueryPort and FE/BE HttpPort.

Parameters

ParameterDescription
host

MySQL port, corresponding to Doris/SelectDB FE QueryPort.

httpHost

Host for Doris stream load, corresponding to Doris/SelectDB FE/BE HttpPort.

totalDataInMemMb

Maximum data size allowed in memory when writing in batches; If the data size exceeds the memory limit, or the wait time exceeds asyncFlushIntervalSec, then data is flushed to the write queue.

asyncFlushIntervalSec

Interval to wait for flushing when writing in batches; If the wait time exceeds asyncFlushIntervalSec, or the data size exceeds totalDataInMemMb, then data is flushed to the write queue.

flushBatchMb

Maximum batch size per table; If the batch size exceeds this limit, then data is flushed to the write queue.

realFlushPauseSec

Wait time to flush data to Doris/SelectDB using stream load. 0 means no wait is needed.

soTimeoutSec

TCP socket timeout (so_timeout) during QueryPort operations.

enableTimeZoneProcess

Enable time zone conversion for time fields.

timezone

Timezone in the Target, e.g., +08:00 Asia/Shanghai America/New_York.

maxInSizePerQuery

Maximum number of IN clause values per query during secondary verification. Queries exceeding this limit will be automatically split.

Tips: To modify the general parameters, see General Parameters and Functions.