Source Amazon RDS for PostgreSQL #
This page describes how to replicate data in real time from Amazon RDS for PostgreSQL, a managed service for PostgreSQL relational database.
The following steps refer to the extracted Arcion self-hosted CLI download as the $REPLICANT_HOME
directory.
Prerequisites #
I. Set up parameter group #
-
Create a database parameter group if you haven’t already specified a parameter group for your database instance.
-
Set the
rds.logical_replication
parameter to1
and attachrds.logical_replication
to your database instance. You must reboot your database instance for this change to take effect. After rebooting your database instance, the system automatically sets thewal_level
parameter tological
.You can verify the values for
wal_level
andrds.logical_replication
with the following command frompsql
client:postgres=> SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
The output is similar to the following:
name | setting -------------------------+--------- rds.logical_replication | on wal_level | logical (2 rows)
-
In the parameter group, make sure
max_replication_slots
equals to1
or greater than the number of replication jobs that you need to run from this RDS for PostgreSQL instance.
II. Create user #
-
Create a user for replication in the source RDS for PostgreSQL database instance. For example, the following creates a user
alex
:postgres=> CREATE ROLE alex LOGIN PASSWORD 'alex12345';
For more information about creating users, see Understanding PostgreSQL roles and permissions.
-
Grant the necessary permissions:
postgres=> GRANT USAGE ON SCHEMA "arcion" TO alex; postgres=> GRANT SELECT ON ALL TABLES IN SCHEMA "arcion" TO alex; postgres=> ALTER ROLE alex WITH REPLICATION;
The preceding commands grant the necessary permissions to user
alex
for the schemaarcion
.
III. Create logical replication slot #
-
Log into the PostgreSQL catalog or database with a privileged account that you want to perform replication with.
-
Create a logical replication slot in this catalog or database using the
wal2json
decoding plugin:SELECT 'init' FROM pg_create_logical_replication_slot('arcion_test', 'wal2json');
The preceding command creates a replication slot with the name
arcion_test
. Thewal2json
plugin is available as an extension in RDS for PostgreSQL. -
Verify that you’ve successfully created a replication slot:
postgres=> SELECT * from pg_replication_slots;
Set up connection configuration #
To connect to your RDS for PostgreSQL instance using basic username and password authentication, you have the following two options:
Specify your credentials in a plain text YAML connection configuration file:
type: POSTGRESQL
host: HOSTNAME
port: PORT_NUMBER
database: "DATABASE_NAME"
username: "USERNAME"
password: "PASSWORD"
max-connections: 30
socket-timeout-s: 60
max-retries: 10
retry-wait-duration-ms: 1000
#Add your replication slot (slot which holds the real-time changes of the source database) as follows:
replication-slots:
arcion_test:
- wal2json
log-reader-type: {STREAM|SQL}
Replace the following:
HOSTNAME
: hostname of the RDS for PostgreSQL instancePORT_NUMBER
: port number of the RDS for PostgreSQL hostDATABASE_NAME
: the database nameUSERNAME
: the username credential to log into your RDS for PostgreSQL instancePASSWORD
: the password associated withUSERNAME
Feel free to change the following parameter values as you need:
max-connections
: the maximum number of connections Replicant opens in RDS for PostgreSQL database.max-retries
: number of times Replicant retries a failed operation.retry-wait-duration-ms
: duration in milliseconds Replicant waits between each retry of a failed operation.socket-timeout-s
: the timeout value in seconds specifying socket read operations. A value of0
disables socket reads. This parameter is only available from version 22.02.12.16.
Important: Make sure that the value ofmax_connections
in your RDS for PostgreSQL instance exceeds the value ofmax-connections
in the preceding connection configuration file. For more information, see Maximum number of database connections in Amazon RDS.
Replication slot #
The replication slot holds the real-time changes of the source database. The preceding sample specifies a replication slot in the following format:
replication-slots:
SLOT_NAME:
- PLUGIN_NAME
Replace the following:
SLOT_NAME
: the replication slot namePLUGIN_NAME
: the plugin you’ve used to create the replication slot. In this case, it’swal2json
.
Currently only one slot can be specified.
Log reader type #
Caution: From versions 23.03.31 and later, log-reader-type
is deprecated. Avoid specifying this parameter.
From versions 23.03.01.12 and later, the value of log-reader-type
defaults to STREAM
. If you choose STREAM
, Replicant captures CDC data through PgReplicationStream
. If you choose SQL
, RDS for PostgreSQL server periodically receives SQL statements for CDC data extraction.
Enable connection by username for STREAM
log reader
#
If you use STREAM
as the log-reader-type
, you must allow an authenticated replication connection as the USERNAME
who performs the replication. To do so, modify the pg_hba.conf
with the following entries depending on the use case:
-
Locate and open the
pg_hba.conf
file. You can find the defaultpg_hba.conf
file inside the data directory initialized by initdb. -
Make the following changes:
# TYPE DATABASE USER ADDRESS METHOD # allow local replication connection to USERNAME (IPv4 + IPv6) local replication USERNAME trust host replication USERNAME 127.0.0.1/32 <auth-method> host replication USERNAME ::1/128 <auth-method> # allow remote replication connection from any client machine to USERNAME (IPv4 + IPv6) host replication USERNAME 0.0.0.0/0 trust host replication USERNAME ::0/0 trust
Replace
USERNAME
with the RDS for PostgreSQL database username that you want to authenticate for replication.
Set up filter configuration (optional) #
If you want to filter data from your source RDS for PostgreSQL database, specify the filter rules in the filter file. For more information on how to define the filter rules and run Replicant CLI with the filter file, see Filter configuration.
For example:
allow:
catalog: "postgres"
schema: "public"
types: [TABLE]
allow:
CUSTOMERS:
allow: ["FB, IG"]
ORDERS:
allow: ["product", "service"]
conditions: "o_orderkey < 5000"
RETURNS:
The preceding sample consists of the following elements:
- Data of object type
TABLE
in the catalogpostgres
and the schemapublic
goes through replication. - From catalog
postgres
, only theCUSTOMERS
,ORDERS
, andRETURNS
tables go through replication. - From
CUSTOMERS
table, only theFB
andIG
columns go through replication. - From the
ORDERS
table, only theproduct
andservice
columns go through replication as long as those columns meet the condition inconditions
. - Since the
RETURNS
table doesn’t specify anything, the entire table goes through replication.
Unless you specify, Replicant replicates all tables in the catalog.
The following illustrates the format you must follow:
allow:
catalog: <your_catalog_name>
types: <your_object_type>
allow:
<your_table_name>:
allow: ["your_column_name"]
condtions: "your_condition"
<your_table_name>:
allow: ["your_column_name"]
conditions: "your_condition"
<your_table_name>:
Set up Extractor configuration #
To configure replication according to your requirements, specify your configuration in the Extractor configuration file.
You can configure the following replication modes by specifying the parameters under their respective sections in the configuration file:
snapshot
realtime
delta-snapshot
See the following sections for more information.
For more information about different Replicant modes, see Running Replicant.
Configure snapshot
replication
#
The following is a sample configuration for operating in snapshot
mode:
snapshot:
threads: 16
fetch-size-rows: 5_000
_traceDBTasks: true
min-job-size-rows: 1_000_000
max-jobs-per-chunk: 32
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
split-hints:
row-count-estimate: 15000
split-key-min-value: 1
split-key-max-value: 60_00
For more information about the configuration parameters for snapshot
mode, see Snapshot Mode.
Configure real-time replication #
For real-time replication, you must create a heartbeat table in the source RDS for PostgreSQL database.
-
Create a heartbeat table in any schema of the database you are going to replicate with the following DDL:
CREATE TABLE "<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8 NOT NULL, PRIMARY KEY("timestamp"))
-
Grant
INSERT
,UPDATE
, andDELETE
privileges to the user configured for replication. -
Specify your configuration under the
realtime
section of the Extractor configuration file. For example:realtime: threads: 4 fetch-size-rows: 10000 fetch-duration-per-extractor-slot-s: 3 _traceDBTasks: true heartbeat: enable: true catalog: "postgres" schema: "public" table-name: replicate_io_cdc_heartbeat column-name: timestamp start-position: start-lsn: 0/3261270
For more information about the configuration parameters for realtime
mode, see Realtime Mode.
Support for DDL replication #
Replicant supports DDL replication for real-time RDS for PostgreSQL source. For more information, contact us.
Replication without replication-slots #
If can’t create replication slots in RDS for PostgreSQL using wal2json
, then you can use a third mode of replication called delta snapshot. In delta snapshot, Replicant uses RDS for PostgreSQL’s internal column to identify changes.
Caution: We strongly recommend that you specify arow-identifier-key
in theper-table-config
section for a table that does not have a primary key or a unique key defined.
You can specify your configuration under the delta-snapshot
section of the Extractor configuration file. For example:
delta-snapshot:
row-identifier-key: [orderkey,suppkey]
update-key: [partkey]
replicate-deletes: true|false
per-table-config:
- catalog: tpch
schema: public
tables:
lineitem1:
row-identifier-key: [l_orderkey, l_linenumber]
split-key: l_orderkey
replicate-deletes: false
For more information about the configuration parameters for delta-snapshot
mode, see Delta-snapshot Mode.