JSON CDC format

JSON CDC format for Kafka #

Arcion Replicant uses the JSON CDC format to represent CDC changes for Kafka target.

Overview #

Replicant supports JSON CDC format for the following sources:

To use JSON CDC format, set the global Applier parameter replication-format to JSON in your Applier configuration file.

DML message structure #

  1. Each message has a key and a value. The key uniquely identifies the change.
  2. Each message has a schema and a payload. The payload follows the schema definition.
  3. Replicant uses primary key, unique key, or row identifier key column is to form key structure. In the absence of primary key, unique key, or row identifier key column, Replicant uses the “default“ string as a key.
  4. Whenever a column that uniquely identifies a record is updated, instead of creating an update event, we generate delete and insert events. The delete event deletes existing record and insert event inserts a new record.
  5. For each delete operation, Replicant generates a tombstone event. The event possesses the same key as the previous delete operation and the value set to “default“.

Examples #

In this section, we’ll see how insert, update, and delete events look like in JSON CDC format for snapshot and realtime mode.

Change events in snapshot mode #

INSERT event #

Click to see sample key and value structure
Key structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "0"
  }
}
Value structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Envelope",
    "fields": [
      {
        "type": "struct",
        "optional": true,
        "field": "before",
        "name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": true,
        "field": "after",
        "name": "KAFKA_snapshot_connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": false,
        "field": "source",
        "name": "KAFKA_snapshot_connector",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          },
          {
            "type": "string",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": true,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "row"
          }
        ]
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ]
  },
  "payload": {
    "before": null,
    "after": {
      "r_regionkey": "0",
      "r_comment": "Test_Replication",
      "r_name": "AFRICA"
    },
    "source": {
      "schema": null,
      "query": "INSERT INTO tpch_scale_0_01.region(r_regionkey, r_name, r_comment) VALUES(0, AFRICA, Test_Replication)",
      "thread": null,
      "server_id": null,
      "version": "5.7.24",
      "file": null,
      "connector": "MYSQL",
      "pos": null,
      "name": "KAFKA_snapshot_connector",
      "gtid": null,
      "row": null,
      "ts_ms": null,
      "db": "tpch_scale_0_01",
      "table": "region",
      "snapshot": "true"
    },
    "op": "r",
    "ts_ms": null
  }
}

Change events in realtime mode #

INSERT event #

Click to see sample key and value structure
Key structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "10"
  }
}
Value structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "10"
  }
}
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
    "fields": [
      {
        "type": "struct",
        "optional": true,
        "field": "before",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": true,
        "field": "after",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": false,
        "field": "source",
        "name": "KAFKA_Connector",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          },
          {
            "type": "string",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": true,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "row"
          }
        ]
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ]
  },
  "payload": {
    "before": null,
    "after": {
      "r_regionkey": "10",
      "r_comment": "ReplicationWorks",
      "r_name": "Test_nation"
    },
    "source": {
      "schema": null,
      "query": "INSERT INTO tpch_scale_0_01.region(r_regionkey, r_name,
r_comment) VALUES(10, Test_nation, ReplicationWorks)",
      "thread": 3908,
      "server_id": "1",
      "version": "5.7.24",
      "file": "log-bin.000003",
      "connector": "MYSQL",
      "pos": 96749554,
      "name": "KAFKA_Connector",
      "gtid": null,
      "row": 1,
      "ts_ms": 1678883495000,
      "db": "tpch_scale_0_01",
      "table": "region",
      "snapshot": "false"
    },
    "op": "c",
    "ts_ms": 1678863695725
  }
}

UPDATE event #

Click to see sample key and value structure
Key structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "10"
  }
}
Value structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
    "fields": [
      {
        "type": "struct",
        "optional": true,
        "field": "before",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": true,
        "field": "after",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": false,
        "field": "source",
        "name": "KAFKA_Connector",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          },
          {
            "type": "string",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": true,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "row"
          }
        ]
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ]
  },
  "payload": {
    "before": {
      "r_regionkey": "10",
      "r_comment": "ReplicationWorks",
      "r_name": "Test_nation"
    },
    "after": {
      "r_regionkey": "10",
      "r_comment": "Test_Replication",
      "r_name": "Test_nation"
    },
    "source": {
      "schema": null,
      "query": "UPDATE tpch_scale_0_01.region SET r_regionkey=10 AND
r_name=Test_nation AND r_comment=Test_Replication WHERE r_regionkey=10 AND
r_name=Test_nation AND r_comment=ReplicationWorks",
      "thread": 1643,
      "server_id": "1",
      "version": "5.7.24",
      "file": "log-bin.000003",
      "connector": "MYSQL",
      "pos": 96759878,
      "name": "KAFKA_Connector",
      "gtid": null,
      "row": 1,
      "ts_ms": 1678883820000,
      "db": "tpch_scale_0_01",
      "table": "region",
      "snapshot": "false"
    },
    "op": "u",
    "ts_ms": 1678864020928
  }
}

DELETE event #

Click to see sample key and value structure
Key structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "10"
  }
}
Value structure #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch_scale_0_01.region.Envelope",
    "fields": [
      {
        "type": "struct",
        "optional": true,
        "field": "before",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": true,
        "field": "after",
        "name": "KAFKA_Connector.tpch_scale_0_01.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": false,
        "field": "source",
        "name": "KAFKA_Connector",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          },
          {
            "type": "string",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": true,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "row"
          }
        ]
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ]
  },
  "payload": {
    "before": {
      "r_regionkey": "10",
      "r_comment": "Test_Replication",
      "r_name": "Test_nation"
    },
    "after": null,
    "source": {
      "schema": null,
      "query": "DELETE FROM tpch_scale_0_01.region WHERE r_regionkey=10 AND
r_name=Test_nation AND r_comment=Test_Replication",
      "thread": 1643,
      "server_id": "1",
      "version": "5.7.24",
      "file": "log-bin.000003",
      "connector": "MYSQL",
      "pos": 96770857,
      "name": "KAFKA_Connector",
      "gtid": null,
      "row": 1,
      "ts_ms": 1678887161000,
      "db": "tpch_scale_0_01",
      "table": "region",
      "snapshot": "false"
    },
    "op": "d",
    "ts_ms": 1678867362465
  }
}
Tombstone event #
{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "KAFKA_Connector.tpch.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": "10"
  }
}

The value for the preceeding Tombstone key structure is the "default" string:

"default"