Source PostgreSQL #
The extracted replicant-cli
will be referred to as the $REPLICANT_HOME
directory.
I. Create a user in PostgreSQL #
-
Log in to PostgreSQL client:
psql -U $POSTGRESQL_ROOT_USER
-
Create a user for replication:
CREATE USER <username> PASSWORD '<password>';
-
Grant the following permissions:
GRANT USAGE ON SCHEMA "<schema>" TO <username>; GRANT SELECT ON ALL TABLES IN SCHEMA "<schema>" TO <username>; ALTER ROLE <username> WITH REPLICATION;
II. Set up PostgreSQL for Replication #
-
Open the PostgreSQL configuration file
postgresql.conf
:vi $PGDATA/postgresql.conf
-
Change the following parameters:
wal_level = logical max_replication_slots = 1 #Can be increased if more slots need to be created
-
To perform log consumption for CDC replication from the PostgreSQL server, you must do either of the following:
- Use the
test_decoding
plugin that is by default installed in PostgreSQL. - Install the logical decoding plugin
wal2json
.
See the following two sections for instructions on how to set up these plugins.
Instructions for using
wal2json
#-
Follow the instructions in the
wal2json
project README to Install thewal2json
plugin. -
Create a logical replication slot for the catalog to be replicated with the following SQL:
SELECT 'init' FROM pg_create_logical_replication_slot('<replication_slot_name>', 'wal2json');
-
Verify the slot has been created:
SELECT * from pg_replication_slots;
Instructions for using
test_decoding
#If you’re using the
test_decoding
plugin, you don’t need to install anything as it comes pre-installed with PostgreSQL.-
Use the following SQL to create a logical replication slot for the
test_decoding
plugin:SELECT 'init' FROM pg_create_logical_replication_slot('<replication_slot_name>', 'test_decoding');
-
Verify that the slot has been created:
SELECT * from pg_replication_slots;
- Use the
-
Set the Replicant identity to
FULL
for the tables part of the replication process that do no have a primary key:ALTER TABLE <table_name> REPLICA IDENTITY FULL;
III. Set up Connection Configuration #
-
From
$REPLICANT_HOME
, navigate to the connection configuration file:vi conf/conn/postgresql.yaml
-
If you store your connection credentials in AWS Secrets Manager, you can tell Replicant to retrieve them. For more information, see Retrieve credentials from AWS Secrets Manager.
Otherwise, you can put your credentials like usernames and passwords in plain form like the sample below:
type: POSTGRESQL host: localhost #Replace localhost with your PostgreSQL host name port: 5432 #Replace the default port number 5432 if needed database: "postgres" #Replace postgres with your database name username: "replicant" #Replace replicant with your postgresql username password: "Replicant#123" #Replace Replicant#123 with your user's password max-connections: 30 #Maximum number of connections replicant can open in postgresql socket-timeout-s: 60 #The timeout value for socket read operations. The timeout is in seconds and a value of zero means that it is disabled. max-retries: 10 #Number of times any operation on the source system will be re-attempted on failures. retry-wait-duration-ms: 1000 #Duration in milliseconds Replicant should wait before performing then next retry of a failed operation. #List your replication slots (slots which hold the real-time changes of the source database) as follows replication-slots: io_replicate: #Replace "io-replicate" with your replication slot name - wal2json #plugin used to create replication slot (wal2json | test_decoding) io_replicate1: #Replace "io-replicate1" with your replication slot name - wal2json log-reader-type: {STREAM|SQL}
The value of
log-reader-type
defaults toSTREAM
. If you chooseSTREAM
, Replicant captures CDC data throughPgReplicationStream
. If you chooseSQL
, PostgreSQL server periodically receives SQL statements for CDC data extraction.Important:
- Make sure that the
max_connections
in PostgreSQL exceeds themax_connections
in the preceding connection configuration file. - From versions 23.03.01.12 and later, 23.03.31 and later,
log-reader-type
is deprecated. Avoid specifying this parameter.
- Make sure that the
-
You can also enable SSL for your connection by including the
ssl
field and specifying the necessary parameters as below:ssl: ssl-cert: <full_path_to_SSL_certificate_file> root-cert: <full_path_to_SSL_root_certificate_file> ssl-key: <full_path_to_SSL_key_file>
The key file must be in PKCS-12 or in PKCS-8 DER format. A PEM key can be converted to DER format using the following openssl command:
openssl pkcs8 -topk8 -inform PEM -in postgresql.key -outform DER -out postgresql.pk8 -v1 PBE-MD5-DES
The socket-timeout-s
parameter has been introduced in v22.02.12.16 and isn’t available in previous versions.
If the
log-reader-type
is set toSTREAM
, the replication connection must be allowed as thethat will be used to perform the replication. To enable replication connection, the pg_hba.conf
file needs to be modified with some of the following entries depending on the usecase:
- From
$REPLICANT_HOME
, navigate to the pg_hba file:vi $PGDATA/pg_hba.conf
- Make the necessary changes as follows:
# TYPE DATABASE USER ADDRESS METHOD # allow local replication connection to <username> (IPv4 + IPv6) local replication <username> trust host replication <username> 127.0.0.1/32 <auth-method> host replication <username> ::1/128 <auth-method> # allow remote replication connection from any client machine to <username> (IPv4 + IPv6) host replication <username> 0.0.0.0/0 <auth-method> host replication <username> ::0/0 <auth-method>
IV. Setup Filter Configuration #
-
From
$REPLICANT_HOME
, navigate to the filter configuration file:vi filter/postgresql_filter.yaml
-
In accordance to you replication needs, specify the data which is to be replicated. Use the format of the example explained below:
allow: #In this example, data of object type Table in the catalog postgres and schema public will be replicated catalog: "postgres" schema: "public" types: [TABLE] #From catalog postgres and schema public, only the CUSTOMERS, ORDERS, and RETURNS tables will be replicated. #Note: Unless specified, all tables in the catalog will be replicated allow: CUSTOMERS: #Within CUSTOMERS, the FB and IG columns will be replicated allow: ["FB, IG"] ORDERS: #Within ORDERS, only the product and service columns will be replicated as long as they meet the condition o_orderkey < 5000 allow: ["product", "service"] conditions: "o_orderkey < 5000" RETURNS: #All columns in the table PART will be replicated without any predicates
The following is a template of the format you must follow:
allow: catalog: <your_catalog_name> schema: <your_schema_name> types: <your_object_type> #If not collections are specified, all the data tables in the provided catalog and schema will be replicated allow: <your_table_name>: allow: ["your_column_name"] condtions: "your_condition" <your_table_name>: allow: ["your_column_name"] conditions: "your_condition" <your_table_name>: ```
For a detailed explanation of configuration parameters in the filter file, read: Filter Reference
V. Set up Extractor Configuration #
To configure replication according to your requirements, specify your configuration in the Extractor configuration file. You can find a sample Extractor configuration file postgresql.yaml
in the $REPLICANT_HOME/conf/src
directory. For a detailed explanation of configuration parameters in the Extractor file, see Extractor Reference.
You can configure the following replication modes by specifying the parameters under their respective sections in the configuration file:
snapshot
realtime
delta-snapshot
See the following sections for more information.
For more information about different Replicant modes, see Running Replicant.
Configure snapshot
replication
#
The following is a sample configuration for operating in snapshot
mode:
snapshot:
threads: 16
fetch-size-rows: 5_000
_traceDBTasks: true
min-job-size-rows: 1_000_000
max-jobs-per-chunk: 32
fetch-partition-tables: true
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
split-hints:
row-count-estimate: 15000
split-key-min-value: 1
split-key-max-value: 60_00
For more information about the configuration parameters for snapshot
mode, see Snapshot Mode.
Aditional snapshot parameters #
fetch-partition-tables
[v23.08.31.1]-
{true|false}
.Whether to fetch partition tables.
Default:
false
.
Configure realtime
replication
#
For realtime replication, you must create a heartbeat table in the source PostgreSQL database.
-
Create a heartbeat table in any schema of the database you are going to replicate with the following DDL:
CREATE TABLE "<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8 NOT NULL, PRIMARY KEY("timestamp"))
-
Grant
INSERT
,UPDATE
, andDELETE
privileges to the user configured for replication. -
Specify your configuration under the
realtime
section of the Extractor configuration file. For example:realtime: threads: 4 fetch-size-rows: 10000 fetch-duration-per-extractor-slot-s: 3 _traceDBTasks: true heartbeat: enable: true catalog: "postgres" schema: "public" table-name: replicate_io_cdc_heartbeat column-name: timestamp start-position: start-lsn: 0/3261270
For more information about the configuration parameters for realtime
mode, see Realtime Mode.
Support for DDL replication #
Replicant supports DDL replication for real-time PostgreSQL source. For more information, contact us.
Replication without replication-slots #
If you’re unable to create replication slots in PostgreSQL using either wal2json
or test_decoding,
then you can use a third mode of replication called delta-snapshot. In delta-snapshot, Replicant uses PostgreSQL’s internal column to identify changes.
Caution: We strongly recommend that you specify arow-identifier-key
in theper-table-config
section for a table which does not have a primary key or a unique key defined.
You can specify your configuration under the delta-snapshot
section of the Extractor configuration file. For example:
delta-snapshot:
row-identifier-key: [orderkey,suppkey]
update-key: [partkey]
replicate-deletes: true|false
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem1:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
replicate-deletes: false
For more information about the configuration parameters for delta-snapshot
mode, see Delta-snapshot Mode.