BladePipe 1.7.0: Stronger alerts, Broader DB support, Faster KingbaseES scanning.
跳到主要内容

Kafka to PostgreSQL

BladePipe supports data replication from Kafka to PostgreSQL. View supported migration, sync, verification, and connector capabilities.

Target DataSource:

Connection

Basic Functions

FunctionDescription
Incremental Data Sync

Allow subscribing to messages from the source Topic and transforming them into DML operations, namely, INSERT, UPDATE, DELETE.

Subscription Modification

Add, delete, or modify the subscribed topics. For more information, see Modify Subscription.

Position Resetting

Reset positions by timestamp to consume the data in a past period again.

Advanced Functions

FunctionDescription
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format
Handling of Zero Value for Time

Allow setting zero value for time to different data types to prevent errors when writing to the Target.

Limits

LimitDescription
Creating Tables in the Target in Advance

Only support automatic Topic creation for messages.

Raw Message Format

Only support raw message replication from Kafka to Kafka, and Raw Message Format needs to be selected at both the Source and the Target.

Primary Key Conflict Handling

PostgreSQL <= 9.4 or Greenplum <= 6 does not support conflict skipping or replacement. Performance may be affected when there are a large number of primary key conflicts.


Source

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the Kafka ports.

Parameters

ParameterDescription
schemaFormat

MQ Message format. For more information, see Message Format.

consumerGroupId

Kafka consumer group ID.

consumeParallel

Degree of consuming Kafka topics in parallel.

sessionTimeoutMs

Kafka session timeout in milliseconds.

maxPollRecords

Maximum number of messages fetched in one poll from Kafka.

dbHeartbeatIntervalSec

Interval for initiating heartbeat on the source database.

dbHeartbeatToleranceStep

The threshold of gap between the latest offset and the current offset. If the actual gap is bigger than the threshold, BladePipe won‘t send heartbeat message.

customClientProps

Custom properties passed to the Kafka client in JSON format. The key is the parameter name and the value is the parameter value. This setting takes the highest priority. For example: AWS IAM Access Control

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Permissions for Account

Required permissions include SELECT, INSERT, DELETE, UPDATE, and common DDL permissions.
For Aliyun AnalyticDB for PostgreSQL, initial accounts or those with SELECT, INSERT, DELETE, UPDATE, and common DDL permissions are needed.

Port Preparation

Allow the migration and sync node (sidecar) to connect to PostgreSQL / Greenplum / AnalyticDB for PostgreSQL / PolarDB for PostgreSQL standard interactive interface (e.g., 5432).

Parameters

ParameterDescription
keyConflictStrategy

Strategy for handling primary key conflicts during write in Incremental DataTask:

  • IGNORE: Ignore conflicts (default)
  • REPLACE: Replace conflicts (optional)

dstWholeReplace

Convert INSERT and UPDATE operations into full row replacement in the Target.

enableTimeZoneProcess

Enable time zone conversion to datetime fields.

timezone

Target time zone, e.g., +08:00, Asia/Shanghai, America/New_York.

defaultZeroDate

Default value for replacing '0000-00-00 00:00:00' / '0000-00-00' values. Optional values include:

  • null (empty)
  • Time (14:23:33)
  • Date (1970-01-01)
  • DateTime (1970-01-01 00:00:00),
  • TimeZone Time (14:23:33+08:00 or 1970-01-01 00:00:00+08:00)
caseSensitive

Case sensitivity strategy for SQL statements, including:

  • UpperCase: Convert to uppercase
  • LowerCase: Convert to lowercase
  • Sensitive: Add qualifiers
  • NoSpecified: No conversion/No qualifiers

writeStrategy

Strategy of writing data to the Target, including:

  • ROW (single row)
  • MULTI_SQL (multiple statements)
  • BATCH (write data in batches, default option)
  • COPY (PostgreSQL COPY command)
defaultGisSRID

Set the SRID for GIS data types.

Tips: To modify the general parameters, see General Parameters and Functions.