PostgreSQL

Source: PostgreSQL #

The extracted replicant-cli will be referred to as the $REPLICANT_HOME directory.

I. Create a user in PostgreSQL #

  1. Log in to PostgreSQL client:

    psql -U $POSTGRESQL_ROOT_USER
    
  2. Create a user for replication:

    CREATE USER <username> PASSWORD '<password>';
    
  3. Grant the following permissions:

    GRANT USAGE ON SCHEMA "<schema>" TO <username>;
    GRANT SELECT ON ALL TABLES IN SCHEMA "<schema>" TO <username>;
    ALTER ROLE <username> WITH REPLICATION;
    

II. Set up PostgreSQL for Replication #

  1. Open the PostgreSQL configuration file postgresql.conf:

    vi $PGDATA/postgresql.conf
    
  2. Change the following parameters:

    wal_level = logical
    max_replication_slots = 1 #Can be increased if more slots need to be created
    
  3. To perform log consumption for CDC replication from the PostgreSQL server, you must do either of the following:

    • Use the test_decoding plugin that is by default installed in PostgreSQL.
    • Install the logical decoding plugin wal2json.

    See the following two sections for instructions on how to set up these plugins.

    Instructions for using wal2json #

    1. Follow the instructions in the wal2json project README to Install the wal2json plugin.

    2. Create a logical replication slot for the catalog to be replicated with the following SQL:

      SELECT 'init' FROM pg_create_logical_replication_slot('<replication_slot_name>', 'wal2json');
      
    3. Verify the slot has been created:

      SELECT * from pg_replication_slots;
      

    Instructions for using test_decoding #

    If you’re using the test_decoding plugin, you don’t need to install anything as it comes pre-installed with PostgreSQL.

    1. Use the following SQL to create a logical replication slot for the test_decoding plugin:

      SELECT 'init' FROM pg_create_logical_replication_slot('<replication_slot_name>', 'test_decoding');
      
    2. Verify that the slot has been created:

      SELECT * from pg_replication_slots;
      
  4. Set the Replicant identity to FULL for the tables part of the replication process that do no have a primary key:

    ALTER TABLE <table_name> REPLICA IDENTITY FULL;
    

III. Set up Connection Configuration #

  1. From $REPLICANT_HOME, navigate to the connection configuration file:

    vi conf/conn/postgresql.yaml
    
  2. If you store your connection credentials in AWS Secrets Manager, you can tell Replicant to retrieve them. For more information, see Retrieve credentials from AWS Secrets Manager.

    Otherwise, you can put your credentials like usernames and passwords in plain form like the sample below:

    type: POSTGRESQL
    
    host: localhost #Replace localhost with your PostgreSQL host name
    port: 5432 #Replace the default port number 5432 if needed
    
    database: "postgres" #Replace postgres with your database name
    username: "replicant" #Replace replicant with your postgresql username
    password: "Replicant#123" #Replace Replicant#123 with your user's password
    
    max-connections: 30 #Maximum number of connections replicant can open in postgresql
    socket-timeout-s: 60 #The timeout value for socket read operations. The timeout is in seconds and a value of zero means that it is disabled.
    max-retries: 10 #Number of times any operation on the source system will be re-attempted on failures.
    retry-wait-duration-ms: 1000 #Duration in milliseconds Replicant should wait before performing then next retry of a failed operation.
    
    #List your replication slots (slots which hold the real-time changes of the source database) as follows
      replication-slots:
        io_replicate: #Replace "io-replicate" with your replication slot name
          - wal2json #plugin used to create replication slot (wal2json | test_decoding)
        io_replicate1: #Replace "io-replicate1" with your replication slot name
          - wal2json
    
    log-reader-type: {STREAM|SQL}
    

    The value of log-reader-type defaults to STREAM. If you choose STREAM, Replicant captures CDC data through PgReplicationStream. If you choose SQL, PostgreSQL server periodically receives SQL statements for CDC data extraction.

    Important:

    • Make sure that the max_connections in PostgreSQL exceeds the max_connections in the preceding connection configuration file.
    • From versions 23.03.01.12 and later, 23.03.31 and later, log-reader-type is deprecated. Avoid specifying this parameter.

  3. You can also enable SSL for your connection by including the ssl field and specifying the necessary parameters as below:

    ssl:
      ssl-cert: <full_path_to_SSL_certificate_file>
      root-cert: <full_path_to_SSL_root_certificate_file>
      ssl-key: <full_path_to_SSL_key_file>
    

    The key file must be in PKCS-12 or in PKCS-8 DER format. A PEM key can be converted to DER format using the following openssl command:

    openssl pkcs8 -topk8 -inform PEM -in postgresql.key -outform DER -out postgresql.pk8 -v1 PBE-MD5-DES
    
The socket-timeout-s parameter has been introduced in v22.02.12.16 and isn’t available in previous versions.

If the log-reader-type is set to STREAM, the replication connection must be allowed as the that will be used to perform the replication. To enable replication connection, the pg_hba.conf file needs to be modified with some of the following entries depending on the usecase:

  1. From $REPLICANT_HOME, navigate to the pg_hba file:
    vi $PGDATA/pg_hba.conf
    
  2. Make the necessary changes as follows:
    # TYPE  DATABASE        USER                  ADDRESS                 METHOD
    
    # allow local replication connection to <username> (IPv4 + IPv6)
    local     replication         <username>                                         trust
    host      replication         <username>    127.0.0.1/32                     <auth-method>
    host      replication         <username>    ::1/128                          <auth-method>
    
    # allow remote replication connection from any client machine  to <username> (IPv4 + IPv6)
    host     replication          <username>    0.0.0.0/0                        <auth-method>
    host     replication          <username>    ::0/0                            <auth-method>
    

IV. Setup Filter Configuration #

  1. From $REPLICANT_HOME, navigate to the filter configuration file:

    vi filter/postgresql_filter.yaml
    
  2. In accordance to you replication needs, specify the data which is to be replicated. Use the format of the example explained below:

    allow:
      #In this example, data of object type Table in the catalog postgres and schema public will be replicated
      catalog: "postgres"
      schema: "public"
      types: [TABLE]
    
      #From catalog postgres and schema public, only the CUSTOMERS, ORDERS, and RETURNS tables will be replicated.
      #Note: Unless specified, all tables in the catalog will be replicated
      allow:
        CUSTOMERS:
        #Within CUSTOMERS, the FB and IG columns will be replicated  
          allow: ["FB, IG"]
    
    
        ORDERS:  
          #Within ORDERS, only the product and service columns will be replicated as long as they meet the condition o_orderkey < 5000
          allow: ["product", "service"]
          conditions: "o_orderkey < 5000"
    
    
    
        RETURNS: #All columns in the table PART will be replicated without any predicates
    

    The following is a template of the format you must follow:

    allow:
      catalog: <your_catalog_name>
      schema: <your_schema_name>
      types: <your_object_type>
    
    #If not collections are specified, all the data tables in the provided catalog and schema will be replicated
    allow:
      <your_table_name>:
        allow: ["your_column_name"]
        condtions: "your_condition"
    
      <your_table_name>:  
        allow: ["your_column_name"]
        conditions: "your_condition"
    
      <your_table_name>:    
      ```
    

For a detailed explanation of configuration parameters in the filter file, read: Filter Reference

V. Set up Extractor Configuration #

To configure replication according to your requirements, specify your configuration in the Extractor configuration file. You can find a sample Extractor configuration file postgresql.yaml in the $REPLICANT_HOME/conf/src directory. For a detailed explanation of configuration parameters in the Extractor file, see Extractor Reference.

You can configure the following replication modes by specifying the parameters under their respective sections in the configuration file:

  • snapshot
  • realtime
  • delta-snapshot

See the following sections for more information.

For more information about different Replicant modes, see Running Replicant.

Configure snapshot replication #

The following is a sample configuration for operating in snapshot mode:

snapshot:
  threads: 16
  fetch-size-rows: 5_000

  _traceDBTasks: true
  min-job-size-rows: 1_000_000
  max-jobs-per-chunk: 32

  per-table-config:
  - catalog: tpch
    schema: public
    tables:
      lineitem:
        row-identifier-key: [l_orderkey, l_linenumber]
        split-key: l_orderkey
        split-hints:
          row-count-estimate: 15000
          split-key-min-value: 1
          split-key-max-value: 60_00

For more information about the configuration parameters for snapshot mode, see Snapshot Mode.

Configure realtime replication #

For realtime replication, you must create a heartbeat table in the source PostgreSQL database.

  1. Create a heartbeat table in any schema of the database you are going to replicate with the following DDL:

    CREATE TABLE "<user_database>"."public"."replicate_io_cdc_heartbeat"("timestamp" INT8 NOT NULL, PRIMARY  KEY("timestamp"))
    
  2. Grant INSERT, UPDATE, and DELETE privileges to the user configured for replication.

  3. Specify your configuration under the realtime section of the Extractor configuration file. For example:

    realtime:
      heartbeat:
        enable: true
        catalog: "postgres"
        schema: "public"
        table-name: replicate_io_cdc_heartbeat
        column-name: timestamp
    

For more information about the configuration parameters for realtime mode, see Realtime Mode.

ddl-replication [v23.01] #

This parameter is for enabling and configuring the auto-reinit functionality for PostgreSQL source. If ddl-replication is enabled, Replicant detects any DDL change on source table and reinitializes the table to sync the schema.

The following options are available:

Option Description Allowed values
enable Whether to enable auto-reinit. If enabled, Replicant detects DDLs on source.
  • true
  • false
ddl-replication-mode Mode of DDL replication.
  • REINIT
  • INLINE

Default: REINIT

detect-ddl-interval The interval of DDL detection in seconds. For example, if set to 600, Replicant detects DDLs after every 600 seconds.

The following is a sample realtime configuration that uses the auto-reinit feature:

realtime:
  threads: 4 
  fetch-size-rows: 10000
  fetch-duration-per-extractor-slot-s: 3
  _traceDBTasks: true
  ddl-replication:
    enable: true
    detect-ddl-interval: 600

  heartbeat:
    enable: true
    catalog: tpch
    schema: public
    interval-ms: 10000

Replication without replication-slots #

If you’re unable to create replication slots in PostgreSQL using either wal2json or test_decoding, then you can use a third mode of replication called delta-snapshot. In delta-snapshot, Replicant uses PostgreSQL’s internal column to identify changes.

Caution: We strongly recommend that you specify a row-identifier-key in the per-table-config section for a table which does not have a primary key or a unique key defined.

You can specify your configuration under the delta-snapshot section of the Extractor configuration file. For example:

delta-snapshot:
  row-identifier-key: [orderkey,suppkey]
  update-key: [partkey]
  replicate-deletes: true|false

  per-table-config:
  - catalog: tpch
    schema: public
    tables:
      lineitem1:
        row-identifier-key: [l_orderkey, l_linenumber]
        split-key: l_orderkey
        replicate-deletes: false

For more information about the configuration parameters for delta-snapshot mode, see Delta-snapshot Mode.