Kafka and MQ

Source IBM Db2 with Kafka/MQ #

This page describes how to set up Source Db2 with Kafka and MQ.

Obtain the dependency files #

Arcion Replicant requires the Db2 JDBC driver as a dependency. If you use IBM MQ to store CDC logs, you must also obtain the MQ Java client files.

Click the following buttons to download the appropriate versions of the JDBC driver and client files. After downloading, put the JAR files inside the lib/ directory of your Replicant self-hosted download folder.

Download IBM Db2 JDBC driver Download IBM MQ Java client files

I. Check permissions #

You need to verify that the user possesses necessary permissions on source Db2 in order to perform replication. To know about the permissions, see IBM Db2 permissions.

II. Enable CDC replication for Db2 MQ #

To enable CDC-based replication from the Source Db2 MQ server, follow the instructions in Enabling CDC replication for Db2 MQ.

III. Create the heartbeat table #

For CDC replication, you must create the heartbeat table on the source database:

CREATE TABLE "tpch"."replicate_io_cdc_heartbeat"("timestamp" BIGINT NOT NULL, 
CONSTRAINT "cdc_heartbeat_id_default_default" PRIMARY KEY("timestamp"))

IV. Set up connection configuration #

Specify our connection details to Replicant with a connection configuration file. You can find a sample connection configuration file db2_src.yaml in the $REPLICANT_HOME/conf/conn directory.

Configure Db2 server connection #

To connect to Db2 with basic username and password authentication, you have these two options:

You can specify your credentials in plain form in the connection configuration file like the folowing sample:

type: DB2

database: tpch #Name of the catalog from which the tables are to be replicated
host: localhost
port: 50002

username: replicant
password: "Replicant#123"

max-connections: 30

max-retries: 10
retry-wait-duration-ms: 1000

proxy-source: false

If you set proxy-source to true, Replicant doesn’t attempt to connect to the source database. You can enable it for real-time mode if the log exists in a separate storage space than the source database.

You can store your connection credentials in a secrets management service and tell Replicant to retrieve the credentials. For more information, see Secrets management.

Configure CDC logs and monitoring #

For CDC-based replication from source Db2 server, you can choose between IBM MQ and Kafka as the storage for Db2 logs. To specify the storage type, set the cdc-log-storage parameter to one of the following:

  • MQ
  • KAFKA_TRANSACTIONAL
  • KAFKA_EVENTUAL

All CDC log and monitoring configurations start with the parent field cdc-log-config.

To use IBM MQ for CDC logs, set cdc-log-storage to MQ.

Connection details #

mqueue-connections

If you enable real-time replication and use IBM MQ for CDC logs, then you need to specify MQ connection information in this field. To connect to MQ, you can either specify credentials in plain text form, or use SSL.
Specify MQ connection credentials in plain text

host

The host running MQ queue manager.

port

The port number on the MQ queue manager host.

username

The username to connect to the MQ queue manager.

password

The associated password with username.

Use SSL to connect to MQ

To use SSL connection, specify the SSL details under the ssl field. The following details are required

trust-store

Details about the TrustStore. The following details are required:

  • path. Path to TrustStore.
  • password. Password for the TrustStore.

key-store

Details about the KeyStore. The following details are required:

  • path. Path to KeyStore.
  • password. Password for the KeyStore.

You must specify this parameter if you have two-way authentication enabled on MQ.

ssl-cipher-suite

Provide the encryption algorithm based on what the configuration on MQ Manager—for example, TLS_RSA_WITH_AES_128_CBC_SHA256.

queue-manager

Name of the queue manager to connect to.

queue-channel

The name of the channel to connect to on the queue manager.

queues

List of IBM MQ queues to connect to.

name

Name of IBM MQ queue.

message-format

The format of message from IBM MQ:

  • XML
  • DELIMITED

message-type [v21.02.01.8]

Specifies the type of message from IBM MQ:

  • ROW
  • TRANSACTION

Message type depends on the value of the MESSAGE CONTENT TYPE that you set using PubQMap. For R as MESSAGE CONTENT TYPE, you can set message-type to ROW. For T as MESSAGE CONTENT TYPE, you can set message-type to TRANSACTION.

lob-send-option

Specifies whether LOB columns are inlined or received in separate MQ messages:

  • INLINE
  • SEPARATE

backup-mqueue-connection [v[20.04.06.1]

Connection details for the backup MQ manager.

Specifying this configuration allows Replicant to seamlessly fall back to the backup MQ manager when primary MQ Manager goes down. You can configure all parameters for backup MQ in a similar fashion to the primary MQ manager.

Specify MQ connection credentials in plain text

host

The host running MQ queue manager.

port

The port number on the MQ queue manager host.

username

The username to connect to the MQ queue manager.

password

The associated password with username.

Use SSL to connect to MQ

To use SSL connection, specify the SSL details under the ssl field. The following details are required

trust-store

Details about the TrustStore. The following details are required:

  • path. Path to TrustStore.
  • password. Password for the TrustStore.

key-store

Details about the KeyStore. The following details are required:

  • path. Path to KeyStore.
  • password. Password for the KeyStore.

You must specify this parameter if you have two-way authentication enabled on MQ.

ssl-cipher-suite

Provide the encryption algorithm based on what the configuration on MQ Manager—for example, TLS_RSA_WITH_AES_128_CBC_SHA256.

queue-manager

Name of the queue manager to connect to.

queue-channel

The name of the channel to connect to on the queue manager.

queues

List of IBM MQ queues to connect to.

name

Name of IBM MQ queue.

message-format

The format of message from IBM MQ:

  • XML
  • DELIMITED

message-type [v21.02.01.8]

Specifies the type of message from IBM MQ:

  • ROW
  • TRANSACTION

lob-send-option

Specifies whether LOB columns are inlined or received in separate MQ messages:

  • INLINE
  • SEPARATE

Sample configuration #

cdc-log-config:
  cdc-log-storage: MQ
  mqueue-connections:
    queue-conn1:
      host: localhost
      port: 1450
      queue-manager: CDC_QM
      queue-channel: CDC_Q_CHANNEL
      username: queue_manager
      password: queue_manager
      queues:
      - name: CDC_LOG_Q
      message-format: XML
      message-type: ROW
      lob-send-option: INLINE
        backup-mqueue-connection:
          host: localhost
          port: 1460
          queue-manager: CDC_QM_BACKUP
          queue-channel: CDC_Q_BACKUP_CHANNEL
          username: backup_queue_manager
          password: backup_queue_manager
          queues:
          - name: CDC_LOG_BACKUP_Q

To use Kafka for CDC logs, set cdc-log-storage to one of the following types:

  • KAFKA_TRANSACTIONAL
  • KAFKA_EVENTUAL

Kafka log storage overview #

  • For KAFKA_TRANSACTIONAL as cdc-log-storage, based on the value of message-format, the following assumptions take place:

    • If you set message-format to XML or DELIMITED, then the key of record is the MQ MessageId and value is the MQMessage in XML or DELIMITED format.
    • If you set message-format to KCOP_MULTIROW_AUDIT, then cdc-log-topic corresponds to the topic name of the COMMIT-STREAM topic associated with the subscription. Replicant replicates the topic in a transactionally consistent manner.
  • For KAFKA_EVENTUAL as cdc-log-storage, the topic name follows the format cdc-log-topic-prefix.<table_name>. This format follows the IBM IIDR naming scheme for Kafka topics. See the sample configurations in the next sections for better understanding.

By default, IIDR creates a Kafka topic using the following naming convention:

<datastore_name>.<subscription_name>.<database>.<schema>.<table>

In this case, set cdc-log-topic-prefix to the following:

<datastore_name>.<subscription_name>.<database>.<schema>

Connection details #

The connection details for Kafka start with the parent kafka-connection field. It contains the following parameters:

cdc-log-topic

The Kafka topic containing Db2 CDC log. Applicable when using KAFKA_TRANSACTIONAL as cdc-log-storage.

The following parameters are required:

cdc-log-topic-prefix [v20.12.04.7]

The common prefix for all Kafka topics undergoing replication. Applicable when you set cdc-log-config to either KAFKA_EVENTUAL or KAFKA_AVRO.

cdc-log-topic-prefix-list [v21.02.01.19]

List of mappings from common prefixes to source tables:

  • cdc-log-topic-prefix. The common prefix for all Kafka IIDR topics.
  • tables. An array of table names.

message-format

Format of message received from Kafka:

  • XML
  • DELIMITED
  • KCOP_MULTIROW_AUDIT

message-type

Specifies the message type:

  • ROW
  • TRANSACTION

This parameter applies only when you set message-format to XML.

lob-send-option

Specifies whether LOB columns are inlined or received in separate MQ messages:

  • INLINE
  • SEPARATE.

connection

Connection configuration to connect to Kafka.

max-poll-records

The maximum number of records returned in a single call to poll(). For more information, see the max.poll.records consumer configuration.

Default: 1000.

max-poll-interval-ms

The maximum delay in milliseconds between invocations of poll() when using consumer group management. For more information, see the max.poll.interval.ms consumer configuration.

Default: 900000.

For better understanding, see the sample configurations in the following sections.

Sample configuration for KAFKA_TRANSACTIONAL #

cdc-log-config:
  cdc-log-storage: KAFKA_TRANSACTIONAL
  kafka-connection:
    cdc-log-topic: cdc_log_topic
    message-format: XML
    message-type: ROW
    lob-send-option: INLINE
    connection:
      brokers:
        broker1:
          host: localhost
          port: 19092

Sample configuration for KAFKA_EVENTUAL #

cdc-log-config:
  cdc-log-storage: KAFKA_EVENTUAL
    kafka-connection:
      cdc-log-topic-prefix: ""
      cdc-log-topic-prefix-list: 
      - cdc-log-topic-prefix: "" .
        tables: [table1, table2]
      - cdc-log-topic-prefix: ""
        tables: [table3, table4]

      message-format: KCOP_MULTIROW_AUDIT
      connection:
        brokers:
          broker1:
            host: localhost
            port: 19092
        schema-registry-url: "http://localhost:8081"
        consumer-group-id: blitzz

V. Set up Extractor configuration #

To configure replication mode according to your requirements, specify your configuration in the Applier configuration file. You can find a sample Applier configuration file db2.yaml in the $REPLICANT_HOME/conf/src directory. For example:

Configure snapshot mode #

For snapshot mode, you can make use of the following sample:

snapshot:
  threads: 16
  fetch-size-rows: 5_000

  _traceDBTasks: true
  fetch-schemas-from-system-tables: true

  per-table-config:
  - catalog: tpch
    schema: db2user
    tables:
      lineitem:
        row-identifier-key: [l_orderkey, l_linenumber]

For more information about the Extractor parameters for snapshot mode, see Snapshot mode.

Configure delta-snapshot mode #

For delta-snapshot mode, you can make use of the following sample:

delta-snapshot:
  #threads: 32
  #fetch-size-rows: 10_000

  #min-job-size-rows: 1_000_000
  max-jobs-per-chunk: 32
  _max-delete-jobs-per-chunk: 32

  delta-snapshot-key: last_update_time
  delta-snapshot-interval: 10
  delta-snapshot-delete-interval: 10
  _traceDBTasks: true
  replicate-deletes: false

  per-table-config:
    - catalog: tpch
      schema: db2user
      tables:
        #      testTable
        #        split-key: split-key-column
        #        split-hints:
        #          row-count-estimate: 100000 
        #          split-key-min-value: 1 
        #          split-key-max-value: 60_000
        #        delta-snapshot-key: delta-snapshot-key-column
        #        row-identifier-key: [col1, col2]
        #        update-key: [col1, col2]
        partsupp:
          split-key: partkey

For more information about the Extractor parameters for delta-snapshot mode, see Delta-snapshot mode.

Configure realtime mode #

For real-time replication, the start-position parameter specifying the starting log position follows a different structure for Db2 MQ and Db2 Kafka. For more information, see the following two samples:

Sample Extractor configurations for realtime mode #

realtime:
  #threads: 1
  #fetch-size-rows: 10000
  _traceDBTasks: true
  #fetch-interval-s: 0
  replicate-empty-string-as-null: true

#  start-position:
#    commit-time: '2020-08-24 08:16:38.019002'
# idempotent-replay: false

  heartbeat:
    enable: true
    catalog: tpch
    schema: db2user
    #table-name: replicate_io_cdc_heartbeat
    #column-name: timestamp
    interval-ms: 10000

In preceding sample, notice the following details:

  • The start-position parameter specifies the starting log position for real-time replication. For more information, see Db2 with MQ in Extractor Reference.
  • If you set message-format to DELIMITED, set replicate-empty-string-as-null to true.
realtime:
  #threads: 1
  #fetch-size-rows: 10000
  _traceDBTasks: true
  #fetch-interval-s: 0
  replicate-empty-string-as-null: true

#  start-position:
#    start-offset: LATEST
# idempotent-replay: false

  heartbeat:
    enable: true
    catalog: tpch
    schema: db2user
    #table-name: replicate_io_cdc_heartbeat
    #column-name: timestamp
    interval-ms: 10000

In the sample above, notice the following details:

  • The start-position parameter specifies the starting log position for realtime replication. For more information, see Db2 with Kafka in Extractor Reference.
  • If you set message-format to DELIMITED, set replicate-empty-string-as-null to true.
For more information about the Extractor parameters for realtime mode, see Realtime mode.