Source Amazon RDS for PostgreSQL #
This page describes how to replicate data in real time from Amazon RDS for PostgreSQL, a managed service for PostgreSQL relational database.
The following steps refer to the extracted Arcion self-hosted CLI download as the $REPLICANT_HOME directory.
Prerequisites #
I. Set up parameter group #
-
Create a database parameter group if you haven’t already specified a parameter group for your database instance.
-
Set the
rds.logical_replicationparameter to1and attachrds.logical_replicationto your database instance. You must reboot your database instance for this change to take effect. After rebooting your database instance, the system automatically sets thewal_levelparameter tological.You can verify the values for
wal_levelandrds.logical_replicationwith the following command frompsqlclient:postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');The output is similar to the following:
name | setting -------------------------+--------- rds.logical_replication | on wal_level | logical (2 rows) -
In the parameter group, make sure
max_replication_slotsequals to1or greater than the number of replication jobs that you need to run from this RDS for PostgreSQL instance.
II. Create user #
-
Create a user for replication in the source RDS for PostgreSQL database instance. For example, the following creates a user
alex:postgres=> CREATE ROLE alex LOGIN PASSWORD 'alex12345';For more information about creating users, see Understanding PostgreSQL roles and permissions.
-
Grant the necessary permissions:
postgres=> GRANT USAGE ON SCHEMA "arcion" TO alex; postgres=> GRANT SELECT ON ALL TABLES IN SCHEMA "arcion" TO alex; postgres=> ALTER ROLE alex WITH REPLICATION;The preceding commands grant the necessary permissions to user
alexfor the schemaarcion.
III. Create logical replication slot #
-
Log into the PostgreSQL catalog or database with a privileged account that you want to perform replication with.
-
Create a logical replication slot in this catalog or database using the
wal2jsondecoding plugin:SELECT 'init' FROM pg_create_logical_replication_slot('arcion_test', 'wal2json');The preceding command creates a replication slot with the name
arcion_test. Thewal2jsonplugin is available as an extension in RDS for PostgreSQL. -
Verify that you’ve successfully created a replication slot:
postgres=> SELECT * from pg_replication_slots;
Set up connection configuration #
To connect to your RDS for PostgreSQL instance using basic username and password authentication, you have the following two options:
Specify your credentials in a plain text YAML connection configuration file:
type: POSTGRESQL
host: HOSTNAME
port: PORT_NUMBER
database: "DATABASE_NAME"
username: "USERNAME"
password: "PASSWORD"
max-connections: 30
socket-timeout-s: 60
max-retries: 10
retry-wait-duration-ms: 1000
#Add your replication slot (slot which holds the real-time changes of the source database) as follows:
replication-slots:
arcion_test:
- wal2json
log-reader-type: {STREAM|SQL}
Replace the following:
HOSTNAME: hostname of the RDS for PostgreSQL instancePORT_NUMBER: port number of the RDS for PostgreSQL hostDATABASE_NAME: the database nameUSERNAME: the username credential to log into your RDS for PostgreSQL instancePASSWORD: the password associated withUSERNAME
Feel free to change the following parameter values as you need:
max-connections: the maximum number of connections Replicant opens in RDS for PostgreSQL database.max-retries: number of times Replicant retries a failed operation.retry-wait-duration-ms: duration in milliseconds Replicant waits between each retry of a failed operation.socket-timeout-s: the timeout value in seconds specifying socket read operations. A value of0disables socket reads. This parameter is only available from version 22.02.12.16.
Important: Make sure that the value ofmax_connectionsin your RDS for PostgreSQL instance exceeds the value ofmax-connectionsin the preceding connection configuration file. For more information, see Maximum number of database connections in Amazon RDS.
Replication slot #
The replication slot holds the real-time changes of the source database. The preceding sample specifies a replication slot in the following format:
replication-slots:
SLOT_NAME:
- PLUGIN_NAME
Replace the following:
SLOT_NAME: the replication slot namePLUGIN_NAME: the plugin you’ve used to create the replication slot. In this case, it’swal2json.
Currently only one slot can be specified.
Log reader type #
Caution: From versions 23.03.31 and later, log-reader-type is deprecated. Avoid specifying this parameter.
From versions 23.03.01.12 and later, the value of log-reader-type defaults to STREAM. If you choose STREAM, Replicant captures CDC data through PgReplicationStream. If you choose SQL, RDS for PostgreSQL server periodically receives SQL statements for CDC data extraction.
Enable connection by username for STREAM log reader
#
If you use STREAM as the log-reader-type, you must allow an authenticated replication connection as the USERNAME who performs the replication. To do so, modify the pg_hba.conf with the following entries depending on the use case:
-
Locate and open the
pg_hba.conffile. You can find the defaultpg_hba.conffile inside the data directory initialized by initdb. -
Make the following changes:
# TYPE DATABASE USER ADDRESS METHOD # allow local replication connection to USERNAME (IPv4 + IPv6) local replication USERNAME trust host replication USERNAME 127.0.0.1/32 <auth-method> host replication USERNAME ::1/128 <auth-method> # allow remote replication connection from any client machine to USERNAME (IPv4 + IPv6) host replication USERNAME 0.0.0.0/0 trust host replication USERNAME ::0/0 trustReplace
USERNAMEwith the RDS for PostgreSQL database username that you want to authenticate for replication.
Set up filter configuration (optional) #
If you want to filter data from your source RDS for PostgreSQL database, specify the filter rules in the filter file. For more information on how to define the filter rules and run Replicant CLI with the filter file, see Filter configuration.
For example:
allow:
catalog: "postgres"
schema: "public"
types: [TABLE]
allow:
CUSTOMERS:
allow: ["FB, IG"]
ORDERS:
allow: ["product", "service"]
conditions: "o_orderkey < 5000"
RETURNS:
The preceding sample consists of the following elements:
- Data of object type
TABLEin the catalogpostgresand the schemapublicgoes through replication. - From catalog
postgres, only theCUSTOMERS,ORDERS, andRETURNStables go through replication. - From
CUSTOMERStable, only theFBandIGcolumns go through replication. - From the
ORDERStable, only theproductandservicecolumns go through replication as long as those columns meet the condition inconditions. - Since the
RETURNStable doesn’t specify anything, the entire table goes through replication.
Unless you specify, Replicant replicates all tables in the catalog.
The following illustrates the format you must follow:
allow:
catalog: <your_catalog_name>
types: <your_object_type>
allow:
<your_table_name>:
allow: ["your_column_name"]
condtions: "your_condition"
<your_table_name>:
allow: ["your_column_name"]
conditions: "your_condition"
<your_table_name>:
Set up Extractor configuration #
To configure replication according to your requirements, specify your configuration in the Extractor configuration file.
You can configure the following replication modes by specifying the parameters under their respective sections in the configuration file:
snapshotrealtimedelta-snapshot
See the following sections for more information.
For more information about different Replicant modes, see Running Replicant.
Configure snapshot replication
#
The following is a sample configuration for operating in snapshot mode:
snapshot:
threads: 16
fetch-size-rows: 5_000
_traceDBTasks: true
min-job-size-rows: 1_000_000
max-jobs-per-chunk: 32
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
split-hints:
row-count-estimate: 15000
split-key-min-value: 1
split-key-max-value: 60_00
For more information about the configuration parameters for snapshot mode, see Snapshot Mode.
Configure real-time replication #
For real-time replication, you must create a heartbeat table in the source RDS for PostgreSQL database.
-
Create a heartbeat table in any schema of the database you are going to replicate with the following DDL:
CREATE TABLE "<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8 NOT NULL, PRIMARY KEY("timestamp")) -
Grant
INSERT,UPDATE, andDELETEprivileges to the user configured for replication. -
Specify your configuration under the
realtimesection of the Extractor configuration file. For example:realtime: threads: 4 fetch-size-rows: 10000 fetch-duration-per-extractor-slot-s: 3 _traceDBTasks: true heartbeat: enable: true catalog: "postgres" schema: "public" table-name: replicate_io_cdc_heartbeat column-name: timestamp start-position: start-lsn: 0/3261270
For more information about the configuration parameters for realtime mode, see Realtime Mode.
Support for DDL replication #
Replicant supports DDL replication for real-time RDS for PostgreSQL source. For more information, contact us.
Replication without replication-slots #
If can’t create replication slots in RDS for PostgreSQL using wal2json, then you can use a third mode of replication called delta snapshot. In delta snapshot, Replicant uses RDS for PostgreSQL’s internal column to identify changes.
Caution: We strongly recommend that you specify arow-identifier-keyin theper-table-configsection for a table that does not have a primary key or a unique key defined.
You can specify your configuration under the delta-snapshot section of the Extractor configuration file. For example:
delta-snapshot:
row-identifier-key: [orderkey,suppkey]
update-key: [partkey]
replicate-deletes: true|false
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem1:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
replicate-deletes: false
For more information about the configuration parameters for delta-snapshot mode, see Delta-snapshot Mode.