JSON CDC format for Kafka #
Arcion Replicant uses the JSON CDC format to represent CDC changes for Kafka target.
Overview #
Replicant supports JSON CDC format for the following sources:
To use JSON CDC format, set the global Applier parameter replication-format
to JSON
in your Applier configuration file.
DML message structure #
- Each message has a key and a value. The key uniquely identifies the change.
- Each message has a schema and a payload. The payload follows the schema definition.
- Replicant uses primary key, unique key, or row identifier key column is to form key structure. In the absence of primary key, unique key, or row identifier key column, Replicant uses the
“default“
string as a key. - Whenever a column that uniquely identifies a record is updated, instead of creating an update event, we generate delete and insert events. The delete event deletes existing record and insert event inserts a new record.
- For each delete operation, Replicant generates a tombstone event. The event possesses the same key as the previous delete operation and the value set to
“default“
.
Examples #
In this section, we’ll see how insert, update, and delete events look like in JSON CDC format for snapshot and realtime mode.
Change events in snapshot mode #
INSERT
event
#
Click to see sample key and value structure
Key structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "0"
}
}
Value structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Envelope",
"fields": [
{
"type": "struct",
"optional": true,
"field": "before",
"name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": true,
"field": "after",
"name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": false,
"field": "source",
"name": "KAFKA_snapshot_connector",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
},
{
"type": "string",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": true,
"field": "file"
},
{
"type": "int64",
"optional": true,
"field": "pos"
},
{
"type": "int32",
"optional": true,
"field": "row"
}
]
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
]
},
"payload": {
"before": null,
"after": {
"r_regionkey": "0",
"r_comment": "Test_Replication",
"r_name": "AFRICA"
},
"source": {
"schema": null,
"query": "INSERT INTO tpch_scale_0_01.region(r_regionkey, r_name, r_comment) VALUES(0, AFRICA, Test_Replication)",
"thread": null,
"server_id": null,
"version": "5.7.24",
"file": null,
"connector": "MYSQL",
"pos": null,
"name": "KAFKA_snapshot_connector",
"gtid": null,
"row": null,
"ts_ms": null,
"db": "tpch_scale_0_01",
"table": "region",
"snapshot": "true"
},
"op": "r",
"ts_ms": null
}
}
Change events in realtime mode #
INSERT
event
#
Click to see sample key and value structure
Key structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "10"
}
}
Value structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "10"
}
}
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
"fields": [
{
"type": "struct",
"optional": true,
"field": "before",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": true,
"field": "after",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": false,
"field": "source",
"name": "KAFKA_Connector",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
},
{
"type": "string",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": true,
"field": "file"
},
{
"type": "int64",
"optional": true,
"field": "pos"
},
{
"type": "int32",
"optional": true,
"field": "row"
}
]
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
]
},
"payload": {
"before": null,
"after": {
"r_regionkey": "10",
"r_comment": "ReplicationWorks",
"r_name": "Test_nation"
},
"source": {
"schema": null,
"query": "INSERT INTO tpch_scale_0_01.region(r_regionkey, r_name,
r_comment) VALUES(10, Test_nation, ReplicationWorks)",
"thread": 3908,
"server_id": "1",
"version": "5.7.24",
"file": "log-bin.000003",
"connector": "MYSQL",
"pos": 96749554,
"name": "KAFKA_Connector",
"gtid": null,
"row": 1,
"ts_ms": 1678883495000,
"db": "tpch_scale_0_01",
"table": "region",
"snapshot": "false"
},
"op": "c",
"ts_ms": 1678863695725
}
}
UPDATE
event
#
Click to see sample key and value structure
Key structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "10"
}
}
Value structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
"fields": [
{
"type": "struct",
"optional": true,
"field": "before",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": true,
"field": "after",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": false,
"field": "source",
"name": "KAFKA_Connector",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
},
{
"type": "string",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": true,
"field": "file"
},
{
"type": "int64",
"optional": true,
"field": "pos"
},
{
"type": "int32",
"optional": true,
"field": "row"
}
]
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
]
},
"payload": {
"before": {
"r_regionkey": "10",
"r_comment": "ReplicationWorks",
"r_name": "Test_nation"
},
"after": {
"r_regionkey": "10",
"r_comment": "Test_Replication",
"r_name": "Test_nation"
},
"source": {
"schema": null,
"query": "UPDATE tpch_scale_0_01.region SET r_regionkey=10 AND
r_name=Test_nation AND r_comment=Test_Replication WHERE r_regionkey=10 AND
r_name=Test_nation AND r_comment=ReplicationWorks",
"thread": 1643,
"server_id": "1",
"version": "5.7.24",
"file": "log-bin.000003",
"connector": "MYSQL",
"pos": 96759878,
"name": "KAFKA_Connector",
"gtid": null,
"row": 1,
"ts_ms": 1678883820000,
"db": "tpch_scale_0_01",
"table": "region",
"snapshot": "false"
},
"op": "u",
"ts_ms": 1678864020928
}
}
DELETE
event
#
Click to see sample key and value structure
Key structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "10"
}
}
Value structure #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
"fields": [
{
"type": "struct",
"optional": true,
"field": "before",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": true,
"field": "after",
"name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
},
{
"type": "string",
"optional": false,
"field": "r_name"
},
{
"type": "string",
"optional": true,
"field": "r_comment"
}
]
},
{
"type": "struct",
"optional": false,
"field": "source",
"name": "KAFKA_Connector",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
},
{
"type": "string",
"optional": true,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": true,
"field": "file"
},
{
"type": "int64",
"optional": true,
"field": "pos"
},
{
"type": "int32",
"optional": true,
"field": "row"
}
]
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
]
},
"payload": {
"before": {
"r_regionkey": "10",
"r_comment": "Test_Replication",
"r_name": "Test_nation"
},
"after": null,
"source": {
"schema": null,
"query": "DELETE FROM tpch_scale_0_01.region WHERE r_regionkey=10 AND
r_name=Test_nation AND r_comment=Test_Replication",
"thread": 1643,
"server_id": "1",
"version": "5.7.24",
"file": "log-bin.000003",
"connector": "MYSQL",
"pos": 96770857,
"name": "KAFKA_Connector",
"gtid": null,
"row": 1,
"ts_ms": 1678887161000,
"db": "tpch_scale_0_01",
"table": "region",
"snapshot": "false"
},
"op": "d",
"ts_ms": 1678867362465
}
}
Tombstone event #
{
"schema": {
"type": "struct",
"optional": false,
"name": "KAFKA_Connector.tpch.region.Key",
"fields": [
{
"type": "int32",
"optional": false,
"field": "r_regionkey"
}
]
},
"payload": {
"r_regionkey": "10"
}
}
The value for the preceeding Tombstone key structure is the "default"
string:
"default"