BladePipe 1.7.0: Stronger alerts, Broader DB support, Faster KingbaseES scanning.
Skip to main content

PostgreSQL to Elasticsearch

BladePipe supports data replication from PostgreSQL to Elasticsearch. View supported migration, sync, verification, and connector capabilities.

Target DataSource:

Connection

Basic Functions

FunctionDescription
Schema Migration

If the target index does not exist, BladePipe will create the index mapping in the target based on the source metadata and mapping rules.

Full Data Migration

Migrate data by sequentially scanning data in tables and writing it in batches to the target database.

Incremental Data Sync

Sync of common DML like INSERT, UPDATE, DELETE is supported.
UPDATE and DELETE for tables without primary keys are not synced.

Data Verification and Correction

Verify all existing data. Optionally, you can correct the inconsistent data based on verification results. Scheduled DataTasks are supported.
For more information, see Create Verification and Correction DataJob.

Subscription Modification

Add, delete, or modify the subscribed tables with support for historical data migration. For more information, see Modify Subscription.

Position Resetting

Reset positions by file position or timestamp. Allow re-consumption of incremental data logs in a past period or since a specific Binlog file and position.

Index Name Mapping

Support the mapping rules, namely, concatenation with underscores (dataJobName_DB_SCHEMA_table), keeping the name the same as that in Source, converting the text to lowercase, converting the text to uppercase, truncating the name by "_digit" suffix.

DDL Sync

PostgreSQL DDL sync is realized by triggers. The user should have the permissions on triggers and tables. For more information, see Permissions Required for PostgreSQL

Metadata Retrieval

Retrieve the target metadata with filtering conditions or target primary keys set from the source table.

Advanced Functions

FunctionDescription
Removal of Target Data before Full Data Migration

Remove the existing data in the Target before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations.

Recreating Target Table

Recreate target tables before running the Full Data Migration, applicable for DataJobs reruning and scheduled Full Data migrations.

Format of Time Written to ES

Time is written to Elasticsearch in the format of the first time record of the field, or yyyy-MM-dd'T'HH:mm:ss if no time format is set.

Setting ES Time Zone

The time zone setting on the page will be written to Elasticsearch only when the time zone format is ZZZZZ.

Optional Fields in Indexing

By default, all fields are indexed. Specific fields can be excluded from indexing.

Field-level Analyzers

Allow selecting analyzers for string fields that are indexed. Support STANDARD (default), SIMPLE, and other common analyzers, with the option to specify custom analyzers.

Setting Index _id Field

By default, the _id field is a concatenation of the source primary key values. It can be changed to other field values.

Scheduled Full Data Migration

For more information, see Create Scheduled Full Data DataJob.

Custom Code

For more information, see Custom Code Processing, Debug Custom Code and Logging in Custom Code.

Data Filtering Conditions

Support data filtering using WHERE conditions, with SQL-92 as the SQL language. For more information, see Data Filtering.

Setting Target Primary Key

Change the primary key to another field to facilitate data aggregation and other operations.


Source

Prerequisites

PrerequisiteDescription
Permissions for Account

Required permissions (taking a self-managed database as an example):

  • GRANT ALL PRIVILEGES ON DATABASE sync_db TO sync_user (or SELECT permission on all views in the sync_db information_schema, and SELECT permission on tables, indexes, constraints to be synchronized)
  • ALTER USER sync_user REPLICATION
Incremental Data Sync Preparation

Prepare as follows:

  • Modify postgresql.conf, set wal_level=logical and wal_log_hints=on
  • Modify pg_hba.conf, set host replication sync_user CIDR netmask md5, host sync_db sync_user CIDR netmask md5, host postgres sync_user CIDR netmask md5
  • Restart PostgreSQL
Port Preparation

Allow the migration and sync node (Worker) to connect to the PostgreSQL port (e.g., port 5432).

Parameters

ParameterDescription
fullFetchSize

Fetch size for scaning full data.

eventStoreSize

Cache size for parsed incremental events.

ignoreGisSRID

Whether to ignore SRID when parsing GIS data types.

defaultGisSRID

Set the SRID for GIS data types.

Tips: To modify the general parameters, see General Parameters and Functions.


Target

Prerequisites

PrerequisiteDescription
Permissions for Account

create, delete, create_index, delete_index, read, write permissions for indexes.

Port Preparation

Allow the migration and sync node (Worker) to connect to the Elasticsearch port.

Parameters

ParameterDescription
maxBulkSizeMb

Maximum batch size per table; If the batch size exceeds this limit, then data is flushed to the write queue.

totalDataInMemMb

Maximum data size allowed in memory when writing in batches; If the data size exceeds the memory limit, or the wait time exceeds asyncFlushIntervalSec, then data is flushed to the write queue.

asyncFlushIntervalSec

Interval to wait for flushing when writing in batches; If the wait time exceeds asyncFlushIntervalSec, or the data size exceeds totalDataInMemMb, then data is flushed to the write queue.

realFlushPauseSec

Wait time to flush data to ElasticSearch using Bulk Write. 0 means no wait is needed.

pkSeparator

Separator for concatenating _id (number of fields > 1).

enableBulkSizeThreshold

Enable batch write mode (enabled by default).

Tips: To modify the general parameters, see General Parameters and Functions.