Debezium CDC format for Kafka

Debezium CDC format for Kafka #

Arcion Replicant uses the Debezium format to represent CDC changes for Kafka as target.

Overview #

Replicant specifically follows the format for Debezium MySQL connector for Apache Kafka as data target. For more information, see Debezium connector for MySQL.

Examples #

In this section, we’ll see how insert, update, and delete events look like in Debezium format for snapshot and realtime mode.

Change events in snapshot mode #

INSERT event #

{
   "schema_1":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"snapshot_connector.tpch_scale_0_01.t1.Key",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"col1"
         }
      ]
   },
   "payload_1":{
      "col1":"11"
   },
   "schema_2":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"snapshot_connector.tpch_scale_0_01.t1.Envelope",
      "fields":[
         {
            "type":"struct",
            "optional":false,
            "field":"before",
            "name":"snapshot_connector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"after",
            "name":"snapshot_connector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"source",
            "name":"snapshot_connector",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"sever_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"query"
               }
            ]
         },
         {
            "type":"struct",
            "optional":true,
            "field":"transaction",
            "name":null,
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ]
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ]
   },
   "payload_2":{
      "before":null,
      "after":{
         "col2":"okay",
         "col1":"11"
      },
      "source":{
         "query":"INSERT INTO tpch_scale_0_01.t1(col1, col2) VALUES(11, okay)",
         "thread":null,
         "server_id":null,
         "version":"any name of your choice",
         "sequence":null,
         "file":null,
         "connector":"MYSQL",
         "pos":null,
         "name":"snapshot_connector",
         "gtid":null,
         "row":null,
         "ts_ms":null,
         "snapshot":true,
         "db":"tpch_scale_0_01",
         "table":"t1"
      },
      "op":"INSERT",
      "ts_ms":null,
      "transaction":null
   }
}

Change events in realtime mode #

DELETE event #

{
   "schema_1":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Key",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"col1"
         }
      ]
   },
   "payload_1":{
      "col1":"6"
   },
   "schema_2":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
      "fields":[
         {
            "type":"struct",
            "optional":false,
            "field":"before",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"after",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"source",
            "name":"KafkaConnector",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"sever_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"query"
               }
            ]
         },
         {
            "type":"struct",
            "optional":true,
            "field":"transaction",
            "name":null,
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ]
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ]
   },
   "payload_2":{
      "before":{
         "col2":"okay",
         "col1":"6"
      },
      "after":null,
      "source":{
         "query":"DELETE FROM tpch_scale_0_01.t1 WHERE col1=6 AND col2=okay",
         "thread":14,
         "server_id":"1",
         "version":"any name of your choice",
         "sequence":null,
         "file":"log-bin.000032 ",
         "connector":"MYSQL",
         "pos":26104,
         "name":"KafkaConnector",
         "gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3718",
         "row":null,
         "ts_ms":1664189049403590,
         "snapshot":false,
         "db":"tpch_scale_0_01",
         "table":"t1"
      },
      "op":"DELETE",
      "ts_ms":1664189049403590,
      "transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3718"
   }
}

INSERT event #

{
   "schema_1":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Key",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"col1"
         }
      ]
   },
   "payload_1":{
      "col1":"6"
   },
   "schema_2":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
      "fields":[
         {
            "type":"struct",
            "optional":false,
            "field":"before",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"after",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"source",
            "name":"KafkaConnector",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"sever_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"query"
               }
            ]
         },
         {
            "type":"struct",
            "optional":true,
            "field":"transaction",
            "name":null,
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ]
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ]
   },
   "payload_2":{
      "before":null,
      "after":{
         "col2":"okay",
         "col1":"6"
      },
      "source":{
         "query":"INSERT INTO tpch_scale_0_01.t1(col1, col2) VALUES(6, okay)",
         "thread":14,
         "server_id":"1",
         "version":"any name of your choice",
         "sequence":null,
         "file":"log-bin.000032 ",
         "connector":"MYSQL",
         "pos":22791,
         "name":"KafkaConnector",
         "gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3708",
         "row":null,
         "ts_ms":1664188958670693,
         "snapshot":false,
         "db":"tpch_scale_0_01",
         "table":"t1"
      },
      "op":"INSERT",
      "ts_ms":1664188958670693,
      "transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3708"
   }
}

UPDATE event #

{
   "schema_1":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Key",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"col1"
         }
      ]
   },
   "payload_1":{
      "col1":"1"
   },
   "schema_2":{
      "type":"struct",
      "optional":false,
      "field":null,
      "name":"KafkaConnector.tpch_scale_0_01.t1.Envelope",
      "fields":[
         {
            "type":"struct",
            "optional":false,
            "field":"before",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"after",
            "name":"KafkaConnector.tpch_scale_0_01.t1.Value",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"col1"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"col2"
               }
            ]
         },
         {
            "type":"struct",
            "optional":false,
            "field":"source",
            "name":"KafkaConnector",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"sequence"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"sever_id"
               },
               {
                  "type":"string",
                  "optional":true,
                  "field":"gtid"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"file"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"pos"
               },
               {
                  "type":"int32",
                  "optional":false,
                  "field":"row"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"query"
               }
            ]
         },
         {
            "type":"struct",
            "optional":true,
            "field":"transaction",
            "name":null,
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ]
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         }
      ]
   },
   "payload_2":{
      "before":{
         "col2":"okay",
         "col1":"1"
      },
      "after":{
         "col2":"good",
         "col1":"1"
      },
      "source":{
         "query":"UPDATE tpch_scale_0_01.t1 SET col1=1 AND col2=good WHERE col1=1 AND col2=okay",
         "thread":14,
         "server_id":"1",
         "version":"any name of your choice",
         "sequence":null,
         "file":"log-bin.000032 ",
         "connector":"MYSQL",
         "pos":46126,
         "name":"KafkaConnector",
         "gtid":"cd5e9510-faae-11ec-a4f6-0242ac110002:3778",
         "row":null,
         "ts_ms":1664189641169040,
         "snapshot":false,
         "db":"tpch_scale_0_01",
         "table":"t1"
      },
      "op":"UPDATE",
      "ts_ms":1664189641169040,
      "transaction":"cd5e9510-faae-11ec-a4f6-0242ac110002:3778"
   }
}