Debezium CDC format for Kafka #
Arcion Replicant uses the Debezium format to represent CDC changes for Kafka as target.
Overview #
Replicant specifically follows the format for Debezium MySQL connector for Apache Kafka as data target. For more information, see Debezium connector for MySQL.
Examples #
In this section, we’ll see how insert, update, and delete events look like in Debezium format for snapshot and realtime mode.
Change events in snapshot mode #
INSERT
event
#
{
"schema_1":{
"type":"struct",
"optional":false,
"field":null,
"name":"snapshot_connector.tpch_scale_0_01.t1.Key",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
}
]
},
"payload_1":{
"col1":"11"
},
"schema_2":{
"type":"struct",
"optional":false,
"field":null,
"name":"snapshot_connector.tpch_scale_0_01.t1.Envelope",
"fields":[
{
"type":"struct",
"optional":false,
"field":"before",
"name":"snapshot_connector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"after",
"name":"snapshot_connector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"source",
"name":"snapshot_connector",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"sever_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"query"
}
]
},
{
"type":"struct",
"optional":true,
"field":"transaction",
"name":null,
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
]
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
]
},
"payload_2":{
"before":null,
"after":{
"col2":"okay",
"col1":"11"
},
"source":{
"query":"INSERT INTO tpch_scale_0_01.t1(col1, col2) VALUES(11, okay)",
"thread":null,
"server_id":null,
"version":"any name of your choice",
"sequence":null,
"file":null,
"connector":"MYSQL",
"pos":null,
"name":"snapshot_connector",
"gtid":null,
"row":null,
"ts_ms":null,
"snapshot":true,
"db":"tpch_scale_0_01",
"table":"t1"
},
"op":"INSERT",
"ts_ms":null,
"transaction":null
}
}
Change events in realtime mode #
DELETE
event
#
{
"schema_1":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Key",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
}
]
},
"payload_1":{
"col1":"6"
},
"schema_2":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
"fields":[
{
"type":"struct",
"optional":false,
"field":"before",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"after",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"source",
"name":"KafkaConnector",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"sever_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"query"
}
]
},
{
"type":"struct",
"optional":true,
"field":"transaction",
"name":null,
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
]
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
]
},
"payload_2":{
"before":{
"col2":"okay",
"col1":"6"
},
"after":null,
"source":{
"query":"DELETE FROM tpch_scale_0_01.t1 WHERE col1=6 AND col2=okay",
"thread":14,
"server_id":"1",
"version":"any name of your choice",
"sequence":null,
"file":"log-bin.000032 ",
"connector":"MYSQL",
"pos":26104,
"name":"KafkaConnector",
"gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3718",
"row":null,
"ts_ms":1664189049403590,
"snapshot":false,
"db":"tpch_scale_0_01",
"table":"t1"
},
"op":"DELETE",
"ts_ms":1664189049403590,
"transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3718"
}
}
INSERT
event
#
{
"schema_1":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Key",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
}
]
},
"payload_1":{
"col1":"6"
},
"schema_2":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
"fields":[
{
"type":"struct",
"optional":false,
"field":"before",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"after",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"source",
"name":"KafkaConnector",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"sever_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"query"
}
]
},
{
"type":"struct",
"optional":true,
"field":"transaction",
"name":null,
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
]
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
]
},
"payload_2":{
"before":null,
"after":{
"col2":"okay",
"col1":"6"
},
"source":{
"query":"INSERT INTO tpch_scale_0_01.t1(col1, col2) VALUES(6, okay)",
"thread":14,
"server_id":"1",
"version":"any name of your choice",
"sequence":null,
"file":"log-bin.000032 ",
"connector":"MYSQL",
"pos":22791,
"name":"KafkaConnector",
"gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3708",
"row":null,
"ts_ms":1664188958670693,
"snapshot":false,
"db":"tpch_scale_0_01",
"table":"t1"
},
"op":"INSERT",
"ts_ms":1664188958670693,
"transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3708"
}
}
UPDATE
event
#
{
"schema_1":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Key",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
}
]
},
"payload_1":{
"col1":"1"
},
"schema_2":{
"type":"struct",
"optional":false,
"field":null,
"name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
"fields":[
{
"type":"struct",
"optional":false,
"field":"before",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"after",
"name":"KafkaConnector.tpch_scale_0_01.t1.Value",
"fields":[
{
"type":"int32",
"optional":false,
"field":"col1"
},
{
"type":"string",
"optional":true,
"field":"col2"
}
]
},
{
"type":"struct",
"optional":false,
"field":"source",
"name":"KafkaConnector",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"sever_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"query"
}
]
},
{
"type":"struct",
"optional":true,
"field":"transaction",
"name":null,
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
]
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
]
},
"payload_2":{
"before":{
"col2":"okay",
"col1":"1"
},
"after":{
"col2":"good",
"col1":"1"
},
"source":{
"query":"UPDATE tpch_scale_0_01.t1 SET col1=1 AND col2=good WHERE col1=1 AND col2=okay",
"thread":14,
"server_id":"1",
"version":"any name of your choice",
"sequence":null,
"file":"log-bin.000032 ",
"connector":"MYSQL",
"pos":46126,
"name":"KafkaConnector",
"gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3778",
"row":null,
"ts_ms":1664189641169040,
"snapshot":false,
"db":"tpch_scale_0_01",
"table":"t1"
},
"op":"UPDATE",
"ts_ms":1664189641169040,
"transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3778"
}
}