Source IBM Db2 with Kafka/MQ #
This page describes how to set up Source Db2 with Kafka and MQ.
Obtain the dependency files #
Arcion Replicant requires the Db2 JDBC driver as a dependency. If you use IBM MQ to store CDC logs, you must also obtain the MQ Java client files.
Click the following buttons to download the appropriate versions of the JDBC driver and client files. After downloading, put the JAR files inside the lib/
directory of your Replicant self-hosted download folder.
I. Check permissions #
You need to verify that the user possesses necessary permissions on source Db2 in order to perform replication. To know about the permissions, see IBM Db2 permissions.
II. Enable CDC replication for Db2 MQ #
To enable CDC-based replication from the Source Db2 MQ server, follow the instructions in Enabling CDC replication for Db2 MQ.
III. Create the heartbeat table #
For CDC replication, you must create the heartbeat table on the source database:
CREATE TABLE "tpch"."replicate_io_cdc_heartbeat"("timestamp" BIGINT NOT NULL,
CONSTRAINT "cdc_heartbeat_id_default_default" PRIMARY KEY("timestamp"))
IV. Set up connection configuration #
Specify our connection details to Replicant with a connection configuration file. You can find a sample connection configuration file db2_src.yaml
in the $REPLICANT_HOME/conf/conn
directory.
Configure Db2 server connection #
To connect to Db2 with basic username and password authentication, you have these two options:
You can specify your credentials in plain form in the connection configuration file like the folowing sample:
type: DB2
database: tpch #Name of the catalog from which the tables are to be replicated
host: localhost
port: 50002
username: replicant
password: "Replicant#123"
max-connections: 30
max-retries: 10
retry-wait-duration-ms: 1000
proxy-source: false
If you set proxy-source
to true
, Replicant doesn’t attempt to connect to the source database. You can enable it for real-time mode if the log exists in a separate storage space than the source database.
Configure CDC logs and monitoring #
For CDC-based replication from source Db2 server, you can choose between IBM MQ and Kafka as the storage for Db2 logs. To specify the storage type, set the cdc-log-storage
parameter to one of the following:
MQ
KAFKA_TRANSACTIONAL
KAFKA_EVENTUAL
All CDC log and monitoring configurations start with the parent field cdc-log-config
.
To use IBM MQ for CDC logs, set cdc-log-storage
to MQ
.
Connection details #
-
mqueue-connections
-
If you enable real-time replication and use IBM MQ for CDC logs, then you need to specify MQ connection information in this field. To connect to MQ, you can either specify credentials in plain text form, or use SSL.
- Specify MQ connection credentials in plain text
-
-
host
- The host running MQ queue manager.
-
port
- The port number on the MQ queue manager host.
-
username
- The username to connect to the MQ queue manager.
-
password
-
The associated password with
username
.
-
- Use SSL to connect to MQ
-
To use SSL connection, specify the SSL details under the
ssl
field. The following details are required -
trust-store
-
Details about the TrustStore. The following details are required:
path
. Path to TrustStore.password
. Password for the TrustStore.
-
key-store
-
Details about the KeyStore. The following details are required:
path
. Path to KeyStore.password
. Password for the KeyStore.
You must specify this parameter if you have two-way authentication enabled on MQ.
-
ssl-cipher-suite
-
Provide the encryption algorithm based on what the configuration on MQ Manager—for example,
TLS_RSA_WITH_AES_128_CBC_SHA256
. -
queue-manager
- Name of the queue manager to connect to.
-
queue-channel
- The name of the channel to connect to on the queue manager.
-
queues
-
List of IBM MQ queues to connect to.
-
name
-
Name of IBM MQ queue.
-
message-format
-
The format of message from IBM MQ:
XML
DELIMITED
-
message-type
[v21.02.01.8] -
Specifies the type of message from IBM MQ:
ROW
TRANSACTION
Message type depends on the value of the
MESSAGE CONTENT TYPE
that you set usingPubQMap
. ForR
asMESSAGE CONTENT TYPE
, you can setmessage-type
toROW
. ForT
asMESSAGE CONTENT TYPE
, you can setmessage-type
toTRANSACTION
. -
lob-send-option
-
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINE
SEPARATE
-
-
backup-mqueue-connection
[v[20.04.06.1] -
Connection details for the backup MQ manager.
Specifying this configuration allows Replicant to seamlessly fall back to the backup MQ manager when primary MQ Manager goes down. You can configure all parameters for backup MQ in a similar fashion to the primary MQ manager.
- Specify MQ connection credentials in plain text
-
-
host
- The host running MQ queue manager.
-
port
- The port number on the MQ queue manager host.
-
username
- The username to connect to the MQ queue manager.
-
password
-
The associated password with
username
.
-
- Use SSL to connect to MQ
-
To use SSL connection, specify the SSL details under the
ssl
field. The following details are required -
trust-store
-
Details about the TrustStore. The following details are required:
path
. Path to TrustStore.password
. Password for the TrustStore.
-
key-store
-
Details about the KeyStore. The following details are required:
path
. Path to KeyStore.password
. Password for the KeyStore.
You must specify this parameter if you have two-way authentication enabled on MQ.
-
ssl-cipher-suite
-
Provide the encryption algorithm based on what the configuration on MQ Manager—for example,
TLS_RSA_WITH_AES_128_CBC_SHA256
. -
queue-manager
- Name of the queue manager to connect to.
-
queue-channel
- The name of the channel to connect to on the queue manager.
-
queues
-
List of IBM MQ queues to connect to.
-
name
-
Name of IBM MQ queue.
-
message-format
-
The format of message from IBM MQ:
XML
DELIMITED
-
message-type
[v21.02.01.8] -
Specifies the type of message from IBM MQ:
ROW
TRANSACTION
-
lob-send-option
-
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINE
SEPARATE
-
Sample configuration #
cdc-log-config:
cdc-log-storage: MQ
mqueue-connections:
queue-conn1:
host: localhost
port: 1450
queue-manager: CDC_QM
queue-channel: CDC_Q_CHANNEL
username: queue_manager
password: queue_manager
queues:
- name: CDC_LOG_Q
message-format: XML
message-type: ROW
lob-send-option: INLINE
backup-mqueue-connection:
host: localhost
port: 1460
queue-manager: CDC_QM_BACKUP
queue-channel: CDC_Q_BACKUP_CHANNEL
username: backup_queue_manager
password: backup_queue_manager
queues:
- name: CDC_LOG_BACKUP_Q
To use Kafka for CDC logs, set cdc-log-storage
to one of the following types:
KAFKA_TRANSACTIONAL
KAFKA_EVENTUAL
Kafka log storage overview #
-
For
KAFKA_TRANSACTIONAL
ascdc-log-storage
, based on the value ofmessage-format
, the following assumptions take place:- If you set
message-format
toXML
orDELIMITED
, then the key of record is the MQMessageId
and value is theMQMessage
inXML
orDELIMITED
format. - If you set
message-format
toKCOP_MULTIROW_AUDIT
, thencdc-log-topic
corresponds to the topic name of theCOMMIT-STREAM
topic associated with the subscription. Replicant replicates the topic in a transactionally consistent manner.
- If you set
-
For
KAFKA_EVENTUAL
ascdc-log-storage
, the topic name follows the formatcdc-log-topic-prefix.<table_name>
. This format follows the IBM IIDR naming scheme for Kafka topics. See the sample configurations in the next sections for better understanding.
By default, IIDR creates a Kafka topic using the following naming convention:
<datastore_name>.<subscription_name>.<database>.<schema>.<table>
In this case, set cdc-log-topic-prefix
to the following:
<datastore_name>.<subscription_name>.<database>.<schema>
Connection details #
The connection details for Kafka start with the parent kafka-connection
field. It contains the following parameters:
-
cdc-log-topic
-
The Kafka topic containing Db2 CDC log. Applicable when using
KAFKA_TRANSACTIONAL
ascdc-log-storage
.The following parameters are required:
-
cdc-log-topic-prefix
[v20.12.04.7] -
The common prefix for all Kafka topics undergoing replication. Applicable when you set
cdc-log-config
to eitherKAFKA_EVENTUAL
orKAFKA_AVRO
. -
cdc-log-topic-prefix-list
[v21.02.01.19] -
List of mappings from common prefixes to source tables:
cdc-log-topic-prefix
. The common prefix for all Kafka IIDR topics.tables
. An array of table names.
-
message-format
-
Format of message received from Kafka:
XML
DELIMITED
KCOP_MULTIROW_AUDIT
-
message-type
-
Specifies the message type:
ROW
TRANSACTION
This parameter applies only when you set
message-format
toXML
. -
lob-send-option
-
Specifies whether LOB columns are inlined or received in separate MQ messages:
INLINE
SEPARATE
.
-
connection
-
Connection configuration to connect to Kafka.
-
max-poll-records
-
The maximum number of records returned in a single call to
poll()
. For more information, see themax.poll.records
consumer configuration.Default:
1000
. -
max-poll-interval-ms
-
The maximum delay in milliseconds between invocations of
poll()
when using consumer group management. For more information, see themax.poll.interval.ms
consumer configuration.Default:
900000
.
-
-
Sample configuration for KAFKA_TRANSACTIONAL
#
cdc-log-config:
cdc-log-storage: KAFKA_TRANSACTIONAL
kafka-connection:
cdc-log-topic: cdc_log_topic
message-format: XML
message-type: ROW
lob-send-option: INLINE
connection:
brokers:
broker1:
host: localhost
port: 19092
Sample configuration for KAFKA_EVENTUAL
#
cdc-log-config:
cdc-log-storage: KAFKA_EVENTUAL
kafka-connection:
cdc-log-topic-prefix: ""
cdc-log-topic-prefix-list:
- cdc-log-topic-prefix: "" .
tables: [table1, table2]
- cdc-log-topic-prefix: ""
tables: [table3, table4]
message-format: KCOP_MULTIROW_AUDIT
connection:
brokers:
broker1:
host: localhost
port: 19092
schema-registry-url: "http://localhost:8081"
consumer-group-id: blitzz
V. Set up Extractor configuration #
To configure replication mode according to your requirements, specify your configuration in the Applier configuration file. You can find a sample Applier configuration file db2.yaml
in the $REPLICANT_HOME/conf/src
directory. For example:
Configure snapshot
mode
#
For snapshot
mode, you can make use of the following sample:
snapshot:
threads: 16
fetch-size-rows: 5_000
_traceDBTasks: true
fetch-schemas-from-system-tables: true
per-table-config:
- catalog: tpch
schema: db2user
tables:
lineitem:
row-identifier-key: [l_orderkey, l_linenumber]
For more information about the Extractor parameters for snapshot
mode, see Snapshot mode.
Configure delta-snapshot
mode
#
For delta-snapshot
mode, you can make use of the following sample:
delta-snapshot:
#threads: 32
#fetch-size-rows: 10_000
#min-job-size-rows: 1_000_000
max-jobs-per-chunk: 32
_max-delete-jobs-per-chunk: 32
delta-snapshot-key: last_update_time
delta-snapshot-interval: 10
delta-snapshot-delete-interval: 10
_traceDBTasks: true
replicate-deletes: false
per-table-config:
- catalog: tpch
schema: db2user
tables:
# testTable
# split-key: split-key-column
# split-hints:
# row-count-estimate: 100000
# split-key-min-value: 1
# split-key-max-value: 60_000
# delta-snapshot-key: delta-snapshot-key-column
# row-identifier-key: [col1, col2]
# update-key: [col1, col2]
partsupp:
split-key: partkey
For more information about the Extractor parameters for delta-snapshot
mode, see Delta-snapshot mode.
Configure realtime
mode
#
For real-time replication, the start-position
parameter specifying the starting log position follows a different structure for Db2 MQ and Db2 Kafka. For more information, see the following two samples:
Sample Extractor configurations for realtime
mode
#
realtime:
#threads: 1
#fetch-size-rows: 10000
_traceDBTasks: true
#fetch-interval-s: 0
replicate-empty-string-as-null: true
# start-position:
# commit-time: '2020-08-24 08:16:38.019002'
# idempotent-replay: false
heartbeat:
enable: true
catalog: tpch
schema: db2user
#table-name: replicate_io_cdc_heartbeat
#column-name: timestamp
interval-ms: 10000
In preceding sample, notice the following details:
- The
start-position
parameter specifies the starting log position for real-time replication. For more information, see Db2 with MQ in Extractor Reference. - If you set
message-format
toDELIMITED
, setreplicate-empty-string-as-null
totrue
.
realtime:
#threads: 1
#fetch-size-rows: 10000
_traceDBTasks: true
#fetch-interval-s: 0
replicate-empty-string-as-null: true
# start-position:
# start-offset: LATEST
# idempotent-replay: false
heartbeat:
enable: true
catalog: tpch
schema: db2user
#table-name: replicate_io_cdc_heartbeat
#column-name: timestamp
interval-ms: 10000
In the sample above, notice the following details:
- The
start-position
parameter specifies the starting log position for realtime replication. For more information, see Db2 with Kafka in Extractor Reference. - If you set
message-format
toDELIMITED
, setreplicate-empty-string-as-null
totrue
.
realtime
mode, see Realtime mode.