Kafka to StarRocks
BladePipe supports data replication from Kafka to StarRocks. View supported migration, sync, verification, and connector capabilities.
| Function | Description |
|---|---|
Incremental Data Sync | Allow subscribing to messages from the source Topic and transforming them into DML operations, namely, INSERT, UPDATE, DELETE. |
Subscription Modification | Add, delete, or modify the subscribed topics. For more information, see Modify Subscription. |
Position Resetting | Reset positions by timestamp to consume the data in a past period again. |
Advanced Functions
| Function | Description |
|---|---|
Message Format | The following message formats are supported. See Message Format.
|
Removal of Target Data before Full Data Migration | Remove the existing data in the Target before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations. |
Recreating Target Table | Recreate target tables before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations. |
Stream Load | Use Stream Load to write data to StarRocks BE. By default, batch write is adopted, with dynamic adjustment of data flush interval and batch size. |
Handling of Zero Value for Time | Allow setting zero value for time to different data types to prevent errors when writing to the Target. |
Limits
| Limit | Description |
|---|---|
Creating Tables in the Target in Advance | Only support automatic Topic creation for messages. |
Raw Message Format | Only support raw message replication from Kafka to Kafka, and Raw Message Format needs to be selected at both the Source and the Target. |
Target Table Type | Only support Primary Key model. |
Source Table Type | Migration and sync of tables without primary keys are not supported. |
DDL Synchronization Errors |
|
Incremental Data Write Conflict Resolution Rule | Using Stream Load method, the primary key is used for full row replacement. |
Prerequisites
| Prerequisite | Description |
|---|---|
Port Preparation | Allow the migration and sync node (Worker) to connect to the Kafka ports. |
Parameters
| Parameter | Description |
|---|---|
schemaFormat | MQ Message format. For more information, see Message Format. |
consumerGroupId | Kafka consumer group ID. |
consumeParallel | Degree of consuming Kafka topics in parallel. |
sessionTimeoutMs | Kafka session timeout in milliseconds. |
maxPollRecords | Maximum number of messages fetched in one poll from Kafka. |
dbHeartbeatIntervalSec | Interval for initiating heartbeat on the source database. |
dbHeartbeatToleranceStep | The threshold of gap between the latest offset and the current offset. If the actual gap is bigger than the threshold, BladePipe won‘t send heartbeat message. |
customClientProps | Custom properties passed to the Kafka client in JSON format. The key is the parameter name and the value is the parameter value. This setting takes the highest priority. For example: AWS IAM Access Control |
Tips: To modify the general parameters, see General Parameters and Functions.
Prerequisites
| Prerequisite | Description |
|---|---|
Permissions for Account | SELECT and DDL permissions (optional) |
Port Preparation | Allow the migration and sync node (Worker) to connect to the StarRocks FE QueryPort and FE/BE HttpPort. |
Parameters
| Parameter | Description |
|---|---|
host | MySQL port, corresponding to StarRocks FE QueryPort. |
httpHost | Host for StarRocks stream load, corresponding to StarRocks FE/BE HttpPort. |
totalDataInMemMb | Maximum data size allowed in memory when writing in batches; If the data size exceeds the memory limit, or the wait time exceeds asyncFlushIntervalSec, then data is flushed to the write queue. |
asyncFlushIntervalSec | Interval to wait for flushing when writing in batches; If the wait time exceeds asyncFlushIntervalSec, or the data size exceeds totalDataInMemMb, then data is flushed to the write queue. |
flushBatchMb | Maximum batch size per table; If the batch size exceeds this limit, then data is flushed to the write queue. |
realFlushPauseSec | Wait time to flush data to StarRocks using stream load, 0 means no wait is needed. |
soTimeoutSec | TCP socket timeout (so_timeout) during QueryPort operations. |
httpSoTimeoutSec | TCP socket timeout (so_timeout) during HttpPort operations. |
enableTimeZoneProcess | Enable time zone conversion for time fields. |
timezone | Timezone in the Target, e.g., +08:00 Asia/Shanghai America/New_York. |
maxInSizePerQuery | Maximum number of IN clause values per query during secondary verification. Queries exceeding this limit will be automatically split. |
Tips: To modify the general parameters, see General Parameters and Functions.