Source IBM Db2 with Kafka/MQ #
This page describes how to set up Source Db2 with Kafka and MQ.
The extracted replicant-cli
will be referred to as the $REPLICANT_HOME
directory in the proceeding steps.
I. Check Permissions #
You need to verify that the necessary permissions are in place on source Db2 in order to perform replication. To know about the permissions, see IBM Db2 Permissions.
II. Enable CDC Replication for Db2 MQ #
For enabling CDC-based replication from the Source Db2 MQ server, please follow the instructions in Enabling CDC Replication for Db2 MQ.
III. Create the Heartbeat Table #
For CDC replication, you must create the heartbeat table on the source database with the following DDL:
CREATE TABLE "tpch"."replicate_io_cdc_heartbeat"("timestamp" BIGINT NOT NULL,
CONSTRAINT "cdc_heartbeat_id_default_default" PRIMARY KEY("timestamp"))
IV. Set up Connection Configuration #
-
From
$REPLICANT_HOME
, navigate to the sample connection configuration file:vi conf/conn/db2.yaml
-
The configuration file has two parts:
- Parameters related to source Db2 server connection.
- Parameters related to CDC logs and monitoring.
Parameters Related to Source Db2 server connection #
If you store your connection credentials in AWS Secrets Manager, you can tell Replicant to retrieve them. For more information, see Retrieve credentials from AWS Secrets Manager.
Otherwise, you can put your credentials like usernames and passwords in plain form like the sample below:
type: DB2
database: tpch #Name of the catalog from which the tables are to be replicated
host: localhost
port: 50002
username: replicant
password: "Replicant#123"
max-connections: 30
max-retries: 10
retry-wait-duration-ms: 1000
#proxy-source: false
- If you set
proxy-source
totrue
, Replicant will not attempt to connect to the source database. You can enable it for real-time mode if the log is in a separate storage space than the source database.
Parameters Related to CDC Logs and Monitoring #
For CDC-based replication from source Db2 server, you can choose between IBM MQ and Kafka as the storage of Db2 logs. All CDC log and monitoring configurations live under the field cdc-log-config
. You specify the storage type via the cdc-log-storage
parameter. Notice the following details about each of the storage type that you can use:
CDC Log Storage type #
For IBM MQ as cdc-log-storage
, the following parameters are available for you to configure:
i. mqueue-connections
: If you enable realtime replication and use IBM MQ for CDC logs, then you need to specify MQ connection information in this field. Each connection can have the following parameters:
host
: The host on which MQ queue manager is running.port
: The port number to connect to MQ queue manager.queue-manager
: Name of queue manager to connect to.queue-channel
: The name of the channel to connect to on the queue manager.username
: The username to connect to the MQ queue manager.password
: The associated password.queues
: List of IBM MQ queues to connect to.name
: Name of IBM MQ queue.message-format
: Format of message that will be received from IBM MQ. Allowed values areXML
andDELIMITED
.message-type
[21.02.01.8]: Type of message that will be received from IBM MQ. Allowed values areROW
andTRANSACTION
.lob-send-option
: If LOB columns are inlined or will be received in separate MQ messages. Allowed values areINLINE
andSEPARATE
.
ssl
:trust-store
:path
: Path to truststore.password
: Password for the truststore.
key-store
: You’ll need this if you have 2-way authentication enabled on MQ.path
: Path to truststore.password
: Password for the truststore.
ssl-cipher-suite
: Provide your encryption algorithm based what is configured on MQ Manager.
backup-mqueue-connection
[20.04.06.1]: Connection details for the backup MQ manager. Providing this configuration allows Replicant to seamlessly failover to the backup MQ Manager when primary MQ Manager is down. You can configure all configuration parameters for backup MQ in a similar fashion to the primary MQ Manager.host
: The host on which MQ queue manager is running.port
: The port number to connect to MQ queue manager.queue-manager
: Name of queue manager to connect to.queue-channel
: The name of the channel to connect to on the queue managerusername
: The username to connect to the MQ queue manager.password
: The associated password.queues
: List of IBM MQ queues to connect to.name
: Name of IBM MQ queue.message-format
: Format of message that will be received from IBM MQ. Allowed values areXML
andDELIMITED
.message-type
[21.02.01.8]: Type of message that will be received from IBM MQ. Allowed values areROW
andTRANSACTION
.lob-send-option
: If LOB columns are inlined or will be received in separate MQ messages. Allowed values areINLINE
,SEPARATE
.
ssl
:trust-store
:path
: Path to truststore.password
: Password for the truststore.
key-store
: You’ll need this if you have 2-way authentication enabled on MQ (for example client authentication).path
: Path to truststore.password
: Password for the truststore.
ssl-cipher-suite
: The encryption algorithm configured on MQ.
Note: You can configure themessage-type
of queues toROW
orTRANSACTION
depending on the value of theMESSAGE CONTENT TYPE
that you set usingPubQMap
. If it’s set toR
, thenmessage-type
can beROW
. If it’s set toT
, thenmessage-type
can beTRANSACTION
.
Below is a sample CDC Log configuration using MQ as cdc-log-storage
:
cdc-log-config:
cdc-log-storage: MQ
mqueue-connections:
queue-conn1:
host: localhost
port: 1450
queue-manager: CDC_QM
queue-channel: CDC_Q_CHANNEL
username: queue_manager
password: queue_manager
queues:
- name: CDC_LOG_Q
#message-format: XML
#message-type: ROW
#lob-send-option: INLINE
# backup-mqueue-connection:
# host: localhost
# port: 1460
# queue-manager: CDC_QM_BACKUP
# queue-channel: CDC_Q_BACKUP_CHANNEL
# username: backup_queue_manager
# password: backup_queue_manager
# queues:
# - name: CDC_LOG_BACKUP_Q
# ssl:
# trust-store:
# path: "/path/to/trust/store"
# password: 'changeit'
# key-store:
# path: "/path/to/key/store"
# password: 'changeit'
# ssl-cipher-suite: 'TLS_RSA_WITH_AES_128_CBC_SHA256'
#
#- name: CDC_LOG_Q_DELIMITED
# message-format: DELIMITED
#ssl:
# trust-store:
# path: "/path/to/trust/store"
# password: 'changeit'
# key-store:
# path: "/path/to/key/store"
# password: 'changeit'
# ssl-cipher-suite: 'TLS_RSA_WITH_AES_128_CBC_SHA256'
If you choose Kafka for CDC logs, set cdc-log-storage
to one of the following types:
KAFKA_TRANSACTIONAL
KAFKA_EVENTUAL
The connection details for Kafka live under the kafka-connection
field. It contains the following parameters:
cdc-log-topic
: The Kafka topic that contains Db2 CDC log. To be used when cdc-log-config isKAFKA_TRANSACTIONAL
.cdc-log-topic-prefix
[20.12.04.7]: The common prefix for all Kafka topics that will be replicated. To be used whencdc-log-config
isKAFKA_EVENTUAL
orKAFKA_AVRO
.cdc-log-topic-prefix-list
[21.02.01.19]: List of mapping from common prefixes to source tables.cdc-log-topic-prefix
: The common prefix for all Kafka IIDR topics.tables
: An array of table names.
message-format
: Format of message that will be received from Kafka. Allowed values areXML
,DELIMITED
, andKCOP_MULTIROW_AUDIT
.message-type
: Message type. Allowed values areROW
andTRANSACTION
. This parameter is valid only whenmessage-format
is set toXML
.lob-send-option
: If LOB columns are inlined or will be received in separate messages from Kafka. Allowed values areINLINE
andSEPARATE
.connection
: Connection config for connecting to Kafka. For more information, see the sample configurations below.
Note:
For
KAFKA_TRANSACTIONAL
ascdc-log-storage
, based on the value ofmessage-format
, the following assumptions will take place:
- If the
message-format
isXML
/DELIMITED
, then the assumption is that the key of record is the MQMessageId
and value is theMQMessage
inXML
/DELIMITED
format.- If the
message-format
isKCOP_MULTIROW_AUDIT
, then the assumption is that thecdc-log-topic
is the topic name of theCOMMIT-STREAM
topic associated with the subscription that will be replicated in a transactionally consistent manner.For
KAFKA_EVENTUAL
ascdc-log-storage
, the assumption is that the topic name is in formatcdc-log-topic-prefix.<table_name>
. Assumption is based on the naming scheme IBM IIDR follows for Kafka topics. See the sample configuration below for better understanding.By default, IIDR creates a Kafka topic using the following naming convention:
<datastore_name>.<subscription_name>.<database>.<schema>.<table>
In this case, you should set
cdc-log-topic-prefix
to the following:<datastore_name>.<subscription_name>.<database>.<schema>
Sample configuration for KAFKA_TRANSACTIONAL
#
cdc-log-config:
cdc-log-storage: KAFKA_TRANSACTIONAL
kafka-connection:
cdc-log-topic: cdc_log_topic
message-format: XML
message-type: ROW
lob-send-option: INLINE
connection:
brokers:
broker1:
host: localhost
port: 19092
Sample configuration for KAFKA_EVENTUAL
#
cdc-log-config:
cdc-log-storage: KAFKA_EVENTUAL
kafka-connection:
cdc-log-topic-prefix: ""
cdc-log-topic-prefix-list:
- cdc-log-topic-prefix: "" .
tables: [table1, table2]
- cdc-log-topic-prefix: ""
tables: [table3, table4]
message-format: KCOP_MULTIROW_AUDIT
connection:
brokers:
broker1:
host: localhost
port: 19092
schema-registry-url: "http://localhost:8081"
consumer-group-id: blitzz
V. Set up Extractor Configuration #
-
From
$REPLICANT_HOME
, navigate to the Extractor configuration file:vi conf/src/db2.yaml
-
The Extractor configuration file has three parts:
- Parameters related to snapshot mode.
- Parameters related to delta snapshot mode.
- Parameters related to realtime mode.
Parameters related to snapshot mode #
For snapshot mode, you can make use of the following sample:
snapshot: threads: 16 fetch-size-rows: 5_000 _traceDBTasks: true #fetch-schemas-from-system-tables: true per-table-config: - catalog: tpch schema: db2user tables: lineitem: row-identifier-key: [l_orderkey, l_linenumber]
Parameters related to delta snapshot mode #
For delta delta snapshot mode, you can make use of the following sample:
delta-snapshot: #threads: 32 #fetch-size-rows: 10_000 #min-job-size-rows: 1_000_000 max-jobs-per-chunk: 32 _max-delete-jobs-per-chunk: 32 delta-snapshot-key: last_update_time delta-snapshot-interval: 10 delta-snapshot-delete-interval: 10 _traceDBTasks: true replicate-deletes: false per-table-config: - catalog: tpch schema: db2user tables: # testTable # split-key: split-key-column # split-hints: # row-count-estimate: 100000 # split-key-min-value: 1 # split-key-max-value: 60_000 # delta-snapshot-key: delta-snapshot-key-column # row-identifier-key: [col1, col2] # update-key: [col1, col2] partsupp: split-key: partkey
Parameters related to realtime mode #
For realtime mode, the
start-position
parameter specifying the starting log position for realtime replication is differently structured for Db2 MQ and Db2 Kafka. For more information, see the following two samples:Sample Extractor realtime configurations #
realtime: #threads: 1 #fetch-size-rows: 10000 _traceDBTasks: true #fetch-interval-s: 0 replicate-empty-string-as-null: true # start-position: # commit-time: '2020-08-24 08:16:38.019002' # idempotent-replay: false heartbeat: enable: true catalog: tpch schema: db2user #table-name: replicate_io_cdc_heartbeat #column-name: timestamp interval-ms: 10000
In preceding sample, notice the following details:
- The
start-position
parameter specifi[es the starting log position for realtime replication. For more information, see Db2 with MQ in Extractor Reference. - If you’ve set
message-format
toDELIMITED
, setreplicate-empty-string-as-null
totrue
.
realtime: #threads: 1 #fetch-size-rows: 10000 _traceDBTasks: true #fetch-interval-s: 0 replicate-empty-string-as-null: true # start-position: # start-offset: LATEST # idempotent-replay: false heartbeat: enable: true catalog: tpch schema: db2user #table-name: replicate_io_cdc_heartbeat #column-name: timestamp interval-ms: 10000
In the sample above, notice the following details:
- The
start-position
parameter specifi[es the starting log position for realtime replication. For more information, see Db2 with Kafka in Extractor Reference. - If you’ve set
message-format
toDELIMITED
, setreplicate-empty-string-as-null
totrue
.