Source IBM Db2 with Kafka/MQ #
This page describes how to set up Source Db2 with Kafka and MQ.
Obtain the dependency files #
Arcion Replicant requires the Db2 JDBC driver as a dependency. If you use IBM MQ to store CDC logs, you must also obtain the MQ Java client files.
Click the following buttons to download the appropriate versions of the JDBC driver and client files. After downloading, put the JAR files inside the lib/ directory of your Replicant self-hosted download folder.
I. Check permissions #
You need to verify that the user possesses necessary permissions on source Db2 in order to perform replication. To know about the permissions, see IBM Db2 permissions.
II. Enable CDC replication for Db2 MQ #
To enable CDC-based replication from the Source Db2 MQ server, follow the instructions in Enabling CDC replication for Db2 MQ.
III. Create the heartbeat table #
For CDC replication, you must create the heartbeat table on the source database:
CREATE TABLE "tpch"."replicate_io_cdc_heartbeat"("timestamp" BIGINT NOT NULL,
CONSTRAINT "cdc_heartbeat_id_default_default" PRIMARY KEY("timestamp"))
IV. Set up connection configuration #
Specify our connection details to Replicant with a connection configuration file. You can find a sample connection configuration file db2_src.yaml in the $REPLICANT_HOME/conf/conn directory.
Configure Db2 server connection #
To connect to Db2 with basic username and password authentication, you have these two options:
You can specify your credentials in plain form in the connection configuration file like the folowing sample:
type: DB2
database: tpch #Name of the catalog from which the tables are to be replicated
host: localhost
port: 50002
username: replicant
password: "Replicant#123"
max-connections: 30
max-retries: 10
retry-wait-duration-ms: 1000
proxy-source: false
If you set proxy-source to true, Replicant doesn’t attempt to connect to the source database. You can enable it for real-time mode if the log exists in a separate storage space than the source database.
Configure CDC logs and monitoring #
For CDC-based replication from source Db2 server, you can choose between IBM MQ and Kafka as the storage for Db2 logs. To specify the storage type, set the cdc-log-storage parameter to one of the following:
MQKAFKA_TRANSACTIONALKAFKA_EVENTUAL
All CDC log and monitoring configurations start with the parent field cdc-log-config.
To use IBM MQ for CDC logs, set cdc-log-storage to MQ.
Connection details #
-
mqueue-connections -
If you enable real-time replication and use IBM MQ for CDC logs, then you need to specify MQ connection information in this field. To connect to MQ, you can either specify credentials in plain text form, or use SSL.
- Specify MQ connection credentials in plain text
-
-
host - The host running MQ queue manager.
-
port - The port number on the MQ queue manager host.
-
username - The username to connect to the MQ queue manager.
-
password -
The associated password with
username.
-
- Use SSL to connect to MQ
-
To use SSL connection, specify the SSL details under the
sslfield. The following details are required -
trust-store -
Details about the TrustStore. The following details are required:
path. Path to TrustStore.password. Password for the TrustStore.
-
key-store -
Details about the KeyStore. The following details are required:
path. Path to KeyStore.password. Password for the KeyStore.
You must specify this parameter if you have two-way authentication enabled on MQ.
-
ssl-cipher-suite -
Provide the encryption algorithm based on what the configuration on MQ Manager—for example,
TLS_RSA_WITH_AES_128_CBC_SHA256. -
queue-manager - Name of the queue manager to connect to.
-
queue-channel - The name of the channel to connect to on the queue manager.
-
queues -
List of IBM MQ queues to connect to.
-
name -
Name of IBM MQ queue.
-
message-format -
The format of message from IBM MQ:
XMLDELIMITED
-
message-type[v21.02.01.8] -
Specifies the type of message from IBM MQ:
ROWTRANSACTION
Message type depends on the value of the
MESSAGE CONTENT TYPEthat you set usingPubQMap. ForRasMESSAGE CONTENT TYPE, you can setmessage-typetoROW. ForTasMESSAGE CONTENT TYPE, you can setmessage-typetoTRANSACTION. -
lob-send-option -
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINESEPARATE
-
-
backup-mqueue-connection[v[20.04.06.1] -
Connection details for the backup MQ manager.
Specifying this configuration allows Replicant to seamlessly fall back to the backup MQ manager when primary MQ Manager goes down. You can configure all parameters for backup MQ in a similar fashion to the primary MQ manager.
- Specify MQ connection credentials in plain text
-
-
host - The host running MQ queue manager.
-
port - The port number on the MQ queue manager host.
-
username - The username to connect to the MQ queue manager.
-
password -
The associated password with
username.
-
- Use SSL to connect to MQ
-
To use SSL connection, specify the SSL details under the
sslfield. The following details are required -
trust-store -
Details about the TrustStore. The following details are required:
path. Path to TrustStore.password. Password for the TrustStore.
-
key-store -
Details about the KeyStore. The following details are required:
path. Path to KeyStore.password. Password for the KeyStore.
You must specify this parameter if you have two-way authentication enabled on MQ.
-
ssl-cipher-suite -
Provide the encryption algorithm based on what the configuration on MQ Manager—for example,
TLS_RSA_WITH_AES_128_CBC_SHA256. -
queue-manager - Name of the queue manager to connect to.
-
queue-channel - The name of the channel to connect to on the queue manager.
-
queues -
List of IBM MQ queues to connect to.
-
name -
Name of IBM MQ queue.
-
message-format -
The format of message from IBM MQ:
XMLDELIMITED
-
message-type[v21.02.01.8] -
Specifies the type of message from IBM MQ:
ROWTRANSACTION
-
lob-send-option -
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINESEPARATE
-
Sample configuration #
cdc-log-config:
cdc-log-storage: MQ
mqueue-connections:
queue-conn1:
host: localhost
port: 1450
queue-manager: CDC_QM
queue-channel: CDC_Q_CHANNEL
username: queue_manager
password: queue_manager
queues:
- name: CDC_LOG_Q
message-format: XML
message-type: ROW
lob-send-option: INLINE
backup-mqueue-connection:
host: localhost
port: 1460
queue-manager: CDC_QM_BACKUP
queue-channel: CDC_Q_BACKUP_CHANNEL
username: backup_queue_manager
password: backup_queue_manager
queues:
- name: CDC_LOG_BACKUP_Q
To use Kafka for CDC logs, set cdc-log-storage to one of the following types:
KAFKA_TRANSACTIONALKAFKA_EVENTUAL
Kafka log storage overview #
-
For
KAFKA_TRANSACTIONALascdc-log-storage, based on the value ofmessage-format, the following assumptions take place:- If you set
message-formattoXMLorDELIMITED, then the key of record is the MQMessageIdand value is theMQMessageinXMLorDELIMITEDformat. - If you set
message-formattoKCOP_MULTIROW_AUDIT, thencdc-log-topiccorresponds to the topic name of theCOMMIT-STREAMtopic associated with the subscription. Replicant replicates the topic in a transactionally consistent manner.
- If you set
-
For
KAFKA_EVENTUALascdc-log-storage, the topic name follows the formatcdc-log-topic-prefix.<table_name>. This format follows the IBM IIDR naming scheme for Kafka topics. See the sample configurations in the next sections for better understanding.
By default, IIDR creates a Kafka topic using the following naming convention:
<datastore_name>.<subscription_name>.<database>.<schema>.<table>
In this case, set cdc-log-topic-prefix to the following:
<datastore_name>.<subscription_name>.<database>.<schema>
Connection details #
The connection details for Kafka start with the parent kafka-connection field. It contains the following parameters:
-
cdc-log-topic -
The Kafka topic containing Db2 CDC log. Applicable when using
KAFKA_TRANSACTIONALascdc-log-storage.The following parameters are required:
-
cdc-log-topic-prefix[v20.12.04.7] -
The common prefix for all Kafka topics undergoing replication. Applicable when you set
cdc-log-configto eitherKAFKA_EVENTUALorKAFKA_AVRO. -
cdc-log-topic-prefix-list[v21.02.01.19] -
List of mappings from common prefixes to source tables:
cdc-log-topic-prefix. The common prefix for all Kafka IIDR topics.tables. An array of table names.
-
message-format -
Format of message received from Kafka:
XMLDELIMITEDKCOP_MULTIROW_AUDIT
-
message-type -
Specifies the message type:
ROWTRANSACTION
This parameter applies only when you set
message-formattoXML. -
lob-send-option -
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINESEPARATE.
-
connection -
Connection configuration to connect to Kafka.
-
max-poll-records -
The maximum number of records returned in a single call to
poll(). For more information, see themax.poll.recordsconsumer configuration.Default:
1000. -
max-poll-interval-ms -
The maximum delay in milliseconds between invocations of
poll()when using consumer group management. For more information, see themax.poll.interval.msconsumer configuration.Default:
900000.
-
-
Sample configuration for KAFKA_TRANSACTIONAL
#
cdc-log-config:
cdc-log-storage: KAFKA_TRANSACTIONAL
kafka-connection:
cdc-log-topic: cdc_log_topic
message-format: XML
message-type: ROW
lob-send-option: INLINE
connection:
brokers:
broker1:
host: localhost
port: 19092
Sample configuration for KAFKA_EVENTUAL
#
cdc-log-config:
cdc-log-storage: KAFKA_EVENTUAL
kafka-connection:
cdc-log-topic-prefix: ""
cdc-log-topic-prefix-list:
- cdc-log-topic-prefix: "" .
tables: [table1, table2]
- cdc-log-topic-prefix: ""
tables: [table3, table4]
message-format: KCOP_MULTIROW_AUDIT
connection:
brokers:
broker1:
host: localhost
port: 19092
schema-registry-url: "http://localhost:8081"
consumer-group-id: blitzz
V. Set up Extractor configuration #
To configure replication mode according to your requirements, specify your configuration in the Applier configuration file. You can find a sample Applier configuration file db2.yaml in the $REPLICANT_HOME/conf/src directory. For example:
Configure snapshot mode
#
For snapshot mode, you can make use of the following sample:
snapshot:
threads: 16
fetch-size-rows: 5_000
_traceDBTasks: true
fetch-schemas-from-system-tables: true
per-table-config:
- catalog: tpch
schema: db2user
tables:
lineitem:
row-identifier-key: [l_orderkey, l_linenumber]
For more information about the Extractor parameters for snapshot mode, see Snapshot mode.
Configure delta-snapshot mode
#
For delta-snapshot mode, you can make use of the following sample:
delta-snapshot:
#threads: 32
#fetch-size-rows: 10_000
#min-job-size-rows: 1_000_000
max-jobs-per-chunk: 32
_max-delete-jobs-per-chunk: 32
delta-snapshot-key: last_update_time
delta-snapshot-interval: 10
delta-snapshot-delete-interval: 10
_traceDBTasks: true
replicate-deletes: false
per-table-config:
- catalog: tpch
schema: db2user
tables:
# testTable
# split-key: split-key-column
# split-hints:
# row-count-estimate: 100000
# split-key-min-value: 1
# split-key-max-value: 60_000
# delta-snapshot-key: delta-snapshot-key-column
# row-identifier-key: [col1, col2]
# update-key: [col1, col2]
partsupp:
split-key: partkey
For more information about the Extractor parameters for delta-snapshot mode, see Delta-snapshot mode.
Configure realtime mode
#
For real-time replication, the start-position parameter specifying the starting log position follows a different structure for Db2 MQ and Db2 Kafka. For more information, see the following two samples:
Sample Extractor configurations for realtime mode
#
realtime:
#threads: 1
#fetch-size-rows: 10000
_traceDBTasks: true
#fetch-interval-s: 0
replicate-empty-string-as-null: true
# start-position:
# commit-time: '2020-08-24 08:16:38.019002'
# idempotent-replay: false
heartbeat:
enable: true
catalog: tpch
schema: db2user
#table-name: replicate_io_cdc_heartbeat
#column-name: timestamp
interval-ms: 10000
In preceding sample, notice the following details:
- The
start-positionparameter specifies the starting log position for real-time replication. For more information, see Db2 with MQ in Extractor Reference. - If you set
message-formattoDELIMITED, setreplicate-empty-string-as-nulltotrue.
realtime:
#threads: 1
#fetch-size-rows: 10000
_traceDBTasks: true
#fetch-interval-s: 0
replicate-empty-string-as-null: true
# start-position:
# start-offset: LATEST
# idempotent-replay: false
heartbeat:
enable: true
catalog: tpch
schema: db2user
#table-name: replicate_io_cdc_heartbeat
#column-name: timestamp
interval-ms: 10000
In the sample above, notice the following details:
- The
start-positionparameter specifies the starting log position for realtime replication. For more information, see Db2 with Kafka in Extractor Reference. - If you set
message-formattoDELIMITED, setreplicate-empty-string-as-nulltotrue.
realtime mode, see Realtime mode.