BladePipe 1.7.0: Stronger alerts, Broader DB support, Faster KingbaseES scanning.
Skip to main content

PostgreSQL to RocketMQ

BladePipe supports data replication from PostgreSQL to RocketMQ. View supported migration, sync, verification, and connector capabilities.

Target DataSource:

Connection

Basic Functions

FunctionDescription
Full Data Migration

Migrate data by sequentially scanning data in tables and writing it in batches to the message middleware.

Incremental Data Sync

Sync of common DML like INSERT, UPDATE, DELETE is supported.

Subscription Modification

Add, delete, or modify the subscribed tables with support for historical data migration. For more information, see Modify Subscription.

Position Resetting

Reset positions by file position or timestamp. Allow re-consumption of incremental data logs in a past period or since a specific Binlog file and position.

Metadata Retrieval

Retrieve the target metadata with filtering conditions or target primary keys set from the source table.

Advanced Functions

FunctionDescription
Message Format

The following message formats are supported. See Message Format.

  • CloudCanal format
  • AlibabaCanal format
Topic Mapping Rules

By default, Topic is formed by connecting source instance id, database, and table with . in between. Also, it supports the mapping rules, namely, keeping the name the same as that in Source, converting the text to lowercase, converting the text to uppercase.

Table-level Topic

Create Topics corresponding to the tables in the Source, and the table partitions can be obtained automatically.

DDL Dedicated Topic

Allow specifying a Topic for DDL. If not specified, DDL time is placed in partition 0 of the Topic created from the corresponding table.

Scheduled Full Data Migration

For more information, see Create Scheduled Full Data DataJob.

Custom Code

For more information, see Custom Code Processing, Debug Custom Code and Logging in Custom Code.

Data Filtering Conditions

Support data filtering using WHERE conditions, with SQL-92 as the SQL language. For more information, see Data Filtering.

Setting Target Primary Key

Change the primary key to another field.


Source

Prerequisites

PrerequisiteDescription
Permissions for Account

Required permissions (taking a self-managed database as an example):

  • GRANT ALL PRIVILEGES ON DATABASE sync_db TO sync_user (or SELECT permission on all views in the sync_db information_schema, and SELECT permission on tables, indexes, constraints to be synchronized)
  • ALTER USER sync_user REPLICATION
Incremental Data Sync Preparation

Prepare as follows:

  • Modify postgresql.conf, set wal_level=logical and wal_log_hints=on
  • Modify pg_hba.conf, set host replication sync_user CIDR netmask md5, host sync_db sync_user CIDR netmask md5, host postgres sync_user CIDR netmask md5
  • Restart PostgreSQL
Port Preparation

Allow the migration and sync node (Worker) to connect to the PostgreSQL port (e.g., port 5432).

Parameters

ParameterDescription
fullFetchSize

Fetch size for scaning full data.

eventStoreSize

Cache size for parsed incremental events.

ignoreGisSRID

Whether to ignore SRID when parsing GIS data types.

defaultGisSRID

Set the SRID for GIS data types.

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Port Preparation

Allow the migration and sync node (Worker) to connect to the RocketMQ ports.

Parameters

ParameterDescription
schemaFormat

Message format. For more information, see Message Format.

Tips: To modify the general parameters, see General Parameters and Functions.