Redis Streams

Destination Redis Streams #

This page describes how to load data in realtime into Redis Streams, an append-only log data structure.

The following steps refer the extracted Arcion self-hosted CLI download as the $REPLICANT_HOME directory.

I. Set up connection configuration #

Specify our Redis connection details to Replicant with a connection configuration file. You can find a sample connection configuration file redis_stream.yaml in the $REPLICANT_HOME/conf/conn directory.

For connecting to Redis, you can choose between two methods for an authenticated connection:

  • Using basic username and password authentication
  • Using SSL

Connect with username and password #

For connecting to Redis with basic username and password authentication, you can specify your credentials in the connection configuration file. Follow these instructions based on whether or not you have SSL encryption enabled for Redis connection.

Without SSL encryption for Redis connection, specify your configuration in the following manner:

type: REDIS_STREAM

host: HOSTNAME
port: PORT_NUMBER
username: 'USERNAME'
password: 'PASSWORD'

max-connections: 30
max-retries: 10

Replace the following:

  • HOSTNAME: the Redis server hostname.
  • PORT_NUMBER: the port number of Redis host
  • USERNAME: the username to connect to the Redis server
  • PASSWORD: the password associated with USERNAME

In the preceding sample:

  • max-connections specifies the maximum number of connections Replicant can open in Redis.
  • max-retries specifies the number of times any failed operation on the system will be re-attempted.

Feel free to change these two values as you need.

You can enable data encryption for Redis connection using SSL. In that case, you need to specify the TrustStore holding the CA certificate along with the username and password. For example:

type: REDIS_STREAM

host: HOSTNAME
port: PORT_NUMBER

username: 'USERNAME'
password: 'PASSWORD'

ssl:
  enable: true
  trust-store:
    path: "PATH_TO_TRUSTSTORE"
    password: "TRUSTSTORE_PASSWORD"
    ssl-store-type: 'TRUSTSTORE_TYPE'
max-connections: 30
max-retries: 10

Replace the following:

  • HOSTNAME: the Redis server hostname.
  • PORT_NUMBER: the port number of Redis host
  • USERNAME: the username to connect to the Redis server
  • PASSWORD: the password associated with USERNAME
  • PATH_TO_TRUSTSTORE: path to the TrustStore
  • TRUSTSTORE_PASSWORD: the TrustStore password
  • TRUSTSTORE_TYPE: the TrustStore type—for example, PKCS12

In the preceding sample:

  • max-connections specifies the maximum number of connections Replicant can open in Redis.
  • max-retries specifies the number of times any failed operation on the system will be re-attempted.

Feel free to change these two values as you need.

Connect using SSL #

If you prefer both client authentication and data encryption using SSL, specify both TrustStore and KeyStore details in the connection configuration file. For example:

type: REDIS_STREAM

host: HOSTNAME
port: PORT_NUMBER

ssl:
  enable: true
  trust-store:
    path: "PATH_TO_TRUSTSTORE"
    password: "TRUSTSTORE_PASSWORD"
    ssl-store-type: 'TRUSTSTORE_TYPE'
  key-store:
    path: "PATH_TO_KEYSTORE"
    password: "KEYSTORE_PASSWORD"
    ssl-store-type: 'KEYSTORE_TYPE'

max-connections: 30
max-retries: 10

Replace the following:

  • HOSTNAME: the Redis server hostname.
  • PORT_NUMBER: the port number of Redis host
  • PATH_TO_TRUSTSTORE: path to the TrustStore
  • TRUSTSTORE_PASSWORD: the TrustStore password
  • TRUSTSTORE_TYPE: the TrustStore type—for example, PKCS12
  • PATH_TO_KEYSTORE: path to the KeyStore
  • KEYSTORE_PASSWORD: the KeyStore password
  • KEYSTORE_TYPE: the KeyStore type—for example, PKCS12

In the preceding sample:

  • max-connections specifies the maximum number of connections Replicant can open in Redis.
  • max-retries specifies the number of times any failed operation on the system will be re-attempted.

Feel free to change these two values as you need.

II. Configure mapper file (optional) #

If you want to define data mapping from your source to Redis Streams, specify the mapping rules in the mapper file. For more information on how to define the mapping rules and run Replicant CLI with the mapper file, see Mapper Configuration.

When mapping source object names to Redis streams, you can choose between two delimiters for stream names. For more information, see Delimiter in Kafka topic and Redis stream names.

III. Set up Applier configuration #

To configure replication mode according to your requirements, specify your configuration in the Applier configuration file. You can find a sample Applier configuration file redis_stream.yaml in the $REPLICANT_HOME/conf/dst directory. For example:

snapshot:
  threads: 16

realtime:
  threads: 16
  split-stream: false

For more information on running Replicant in different modes, see Running Replicant.

You can configure Redis Streams for operating in either snapshot or realtime modes.

Configure snapshot mode #

For operating in snapshot mode, specify your configuration under the snapshot section of the conifiguration file. For example:

snapshot:
  threads: 32
  batch-size-rows: 10_000
  txn-size-rows: 10_000

Additional snapshot parameters #

log-row-level-errors #

true or false.

During snapshot replication, if a given batch fails, Replicant retries the failed rows. You can set this parameter to true if you want to log the failed rows in the trace.log file.

For more information about the Applier parameters for snapshot mode, see Snapshot Mode.

Configure realtime mode #

For operating in realtime mode, specify your configuration under the realtime section of the conifiguration file. For example:

realtime:
  threads: 16
  replay-consistency: EVENTUAL
  txn-size-rows: 10_000
  batch-size-rows: 10_000

Additional realtime parameters #

split-stream #

true or false.

Creates a separate stream for snapshot and CDC data if true. If false, a single stream contains the data for snapshot and CDC. split-stream is a global parameter for realtime mode. So you can’t change it on a per-table basis.

Default: true.

For more information about the configuration parameters for realtime mode, see Realtime Mode.

Design considerations #

Supported platforms #

Arcion Replicant supports the following sources for Redis Streams as target:

For MySQL, you can also enable Global Transaction ID (GTID) based logging and enforce GTID consistency if Redis messages require them. To do so, add the following to your MySQL option file my.cnf:

gtid_mode=ON 
enforce-gtid-consistency=ON

Failures and rollbacks #

Redis stream acts like an append log that where each Stream entry has an ID for each message and allows deleting messages with a given Stream entry ID. However, Redis does not support rollback functionality with transactions. So, if some rows in a batch fail, the entire transaction is not rolled back. Due to this behavior, Replicant follows this strategy:

  • For snapshot, Replicant identifies the failed rows in a given batch and retries those failed rows.
  • For realtime, since Replicant must maintain the order, Replicant tries to undo the committed rows in a given batch and retries the entire batch.

Replicant’s behavior after reaching max-retries #

After reaching the maximum number of re-attempts specified in max-retries, Replicant’s behavior depends on the replication mode and the type of transactional consistency.

During snapshot phase

The skip-tables-on-failures Applier configuration parameter defaults to true. Therefore, Replicant excludes the table from the replication rather than stopping the Replicant process by throwing an exception. This behavior prevents the rest of the tables from going into an inconsistent state. Using the dynamic reinitialization feature, you can add the tables Replicant excludes from replication.

You can also disable skip-tables-on-failures. In that case, Replicant throws an exception and performs snapshot recovery when you resume replication with the --resume option.

During realtime phase with eventual replay consistency

The skip-tables-on-failures Applier configuration parameter defaults to true. Therefore, Replicant excludes the table from the replication rather than stopping the Replicant process by throwing an exception. This behavior prevents the rest of the tables from going into an inconsistent state. Using the dynamic reinitialization feature, you can add the tables Replicant excludes from replication.

You can also disable skip-tables-on-failures. In that case, Replicant throws an exception and performs real-time recovery when you resume replication with the --resume option.

During realtime phase with global replay consistency

Replicant dumps the Stream entry IDs for the messages it couldn’t delete programmatically in the following file:

$REPLICANT_HOME/data/<replication_id>/bad_rows/replicate_io_indoubt_txn_log

You need to clean up those entries manually and resume the replication run. You can use the following command for cleaning up the entries:

redis-cli XDEL STREAM_NAME STREAM_ENTRY_ID_FROM_FILE [,STREAM_ENTRY_ID_FROM_FILE]

Replace STREAM_NAME and STREAM_ENTRY_ID_FROM_FILE with the corresponding stream names and IDs.

Transactional consistency in realtime mode #

Realtime mode supports the following two consistency modes.

GLOBAL
Replicant carries out real-time replication with global transactional consistency. A single stream holds CDC logs in transaction order.
EVENTUAL
Replicant carries out real-time replication with eventual consistency. Replicant also carries out replay per table and a stream object exists for each table.

Set the realtime configuration parameter replay-consistency to whatever mode you want under the realtime section of the Applier configuration file.

DML message structure #

  1. Each message contains a key and a value. The key uniquely identifies the change.
  2. Each message contains a schema and a payload. The payload follows the schema definition.
  3. Replicant uses primary key, unique key, or row identifier key column to form key structure. In the absence of primary key, unique key, or row identifier key column, Replicant uses the "default" string for the key.
  4. Whenever a column that uniquely identifies a record is updated, instead of creating an update event, Replicant generates delete and insert events. The delete event deletes existing record and insert event inserts a new record.
  5. For each delete operation, Replicant generates a tombstone event. Replicant assigns the event the same key as the previous delete operation and sets the value to "default".
Click to see sample key and value structure

Key structure #

{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "REDIS_STREAM_Connector.tpch.region.Key",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "r_regionkey"
      }
    ]
  },
  "payload": {
    "r_regionkey": 10
  }
}

Value structure #

{
  "schema": {
    "type": "struct",
    "optional": false,
    "name": "REDIS_STREAM_Connector.tpch.region.Envelope",
    "fields": [
      {
        "type": "struct",
        "optional": true,
        "field": "before",
        "name": "REDIS_STREAM_Connector.tpch.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": true,
        "field": "after",
        "name": "REDIS_STREAM_Connector.tpch.region.Value",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "r_regionkey"
          },
          {
            "type": "string",
            "optional": false,
            "field": "r_name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "r_comment"
          }
        ]
      },
      {
        "type": "struct",
        "optional": false,
        "field": "source",
        "name": "REDIS_STREAM_Connector",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          },
          {
            "type": "string",
            "optional": true,
            "field": "snapshot"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": true,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": true,
            "field": "row"
          }
        ]
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ]
  },
  "payload": {
    "before": {
      "r_regionkey": 10,
      "r_name": "Test_nation",
      "r_comment": "ReplicationWorks"
    },
    "after": {
      "r_regionkey": 10,
      "r_name": "Test_nation",
      "r_comment": "TestReplication"
    },
    "source": {
      "version": "5.7.41",
      "connector": "MYSQL",
      "name": "REDIS_STREAM_Connector",
      "ts_ms": 1681129889000,
      "db": "tpch",
      "schema": null,
      "table": "region",
      "snapshot": "false",
      "server_id": "1",
      "gtid": null,
      "file": "mysql-log.000019",
      "pos": 4436,
      "row": 5,
      "thread": 69,
      "query": "UPDATE tpch.region SET r_regionkey=10 AND r_name=Test_nation AND r_comment=TestReplication WHERE r_regionkey=10 AND r_name=Test_nation AND r_comment=ReplicationWorks"
    },
    "op": "u",
    "ts_ms": 1681110090233
  }
}