Kafka

Destination Kafka #

The extracted replicant-cli will be referred to as the $REPLICANT_HOME directory in the proceeding steps.

I. Set up Connection Configuration #

  1. From $REPLICANT_HOME, navigate to the sample Kafka connection configuration file:
    vi conf/conn/kafka.yaml
    
  2. Make the necessary changes as follows:
    type: KAFKA
    
    username: 'replicant' #Replace replicant with the username of your user that connects to your Kafka server
    password: 'Replicant#123' #Replace Replicant#123 with your user's password
    
    #ssl:
    #  enable: true
    #  trust-store:
    #      path: "<path>/kafka.server.truststore.jks"
    #      password: "<password>"
    
    #Multiple Kafka brokers can be specified using the format below:
    brokers:
       broker1: #Replace broker1 with your broker name
           host: localhost #Replace localhost with your broker's host
           port: 19092 #Replace 19092 with your broker's port
       broker2: #Replace broker2 with your broker name
           host: localhost #Replace localhost with your broker's host
           port: 29092 #Replace 29092 with your broker's port
    max-retries: #Number of times any operation on the system will be re-attempted on failures.
    retry-wait-duration-ms : #Duration in milliseconds replicant should wait before performing then next retry of a failed operation
    

II. Set up Applier Configuration #

  1. From $REPLICANT_HOME, naviagte to the sample Kafka applier configuration file:

    vi conf/dst/kafka.yaml
    
  2. The configuration file has two parts:

    • Parameters related to snapshot mode.
    • Parameters related to realtime mode.

    For snapshot mode, the following Kafka-specific parameters are available:

    • replication-factor[v21.12.02.6]: Replication factor for data topics. For Kafka cluster setup this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

    • num-shards[v21.12.02.6]: Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.

    • shard-key[v21.12.02.6]: Shard key to be used for partitioning data topics.

    • shard-function[21.12.02.6]: Sharding function to be used to deduce the partition allotment based on shard-key for all data topics. Values allowed are MOD and NONE.

      Default: By default, this parameter is set to NONE, meaning Kafka will use it’s partitioning algorithm.

    • kafka-compression-type[v20.05.12.3]: Compression type. Allowed values are lz4, snappy, gzip, and none.

      Default: By default, this parameter is set to lz4.

    - kafka-batch-size-in-bytes[v20.05.12.3]: batch size for Kafka producer.

    Default: By default, this parameter is set to 100000.

    • kafka-buffer-memory-size-in-bytes*[v20.05.12.3]: Memory allocated to Kafka client to store unsent messages. (Default set to 67108864)

      Default: By default, this parameter is set to 67108864.

    • kafka-linger-ms[v20.05.12.3]: Config used to give more time for Kafka batches to fill (in milliseconds).

      Default: By default, this parameter is set to 10.

    • kafka-interceptor-classes[v21.09.17.2]: Config used to specify list of interceptor classes. This config corresponds to Kafka’s ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

    • per-table-config[v20.12.04.6]: This configuration allows you to specify various properties for target tables on a per table basis.

      • replication-factor[v21.12.02.6]: Replication factor for data topics. For Kafka cluster setup, this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

      • num-shards[v21.12.02.6]: Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.

      • shard-key[v21.12.02.6]: Shard key to be used for partitioning data topic.

      • shard-function[v21.12.02.6]: Sharding function to be used to deduce the partition allotment based on shard-key for all data topics. Values allowed are MOD and NONE.

        Default: By default, this parameter is set to NONE, meaning Kafka will use it’s partitioning algorithm.

    Below is a sample config for snapshot mode:

    snapshot:
     threads: 16 #Specify the maximum number of threads Replicant should use for writing to the target
    
     replication-factor: 1
     schema-dictionary: SCHEMA_DUMP  # Allowed values: POJO | SCHEMA_DUMP| NONE
     kafka-compression-type: lz4
     kafka-batch-size-in-bytes: 100000
     kafka-buffer-memory-size-in-bytes: 67108864
     kafka-linger-ms: 10
    

    If you want to operate in realtime mode, you can use a realtime section to specify your configuration. The following Kafka-specific parameters are available:

    • replication-factor[v21.12.02.6]: Replication factor for CDC topics. For Kafka cluster setup this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

    • num-shards[v21.12.02.6]: Number of partitions to be created for all CDC log topics.

    • shard-key[v21.12.02.6]: Shard key to be used for partitioning CDC logs in all target topics.

    • shard-function[21.12.02.6]: Sharding function to be used to deduce the partition allotment based on shard-key for all CDC log topics. Values allowed are MOD and NONE.

      Default: By default, this parameter is set to NONE, meaning Kafka will use it’s partitioning algorithm.

    • kafka-compression-type[v20.05.12.3]: Compression type. Allowed values are lz4, snappy, gzip, and none.

      Default: By default, this parameter is set to lz4.

    • kafka-batch-size-in-bytes[v20.05.12.3]: batch size for Kafka producer.

      Default: By default, this parameter is set to 100000.

    • kafka-buffer-memory-size-in-bytes*[v20.05.12.3]: Memory allocated to Kafka client to store unsent messages. (Default set to 67108864)

      Default: By default, this parameter is set to 67108864.

    • kafka-linger-ms[v20.05.12.3]: Config used to give more time for Kafka batches to fill (in milliseconds).

      Default: By default, this parameter is set to 10.

    • kafka-interceptor-classes[v21.09.17.2]: Config used to specify list of interceptor classes. This config corresponds to Kafka’s ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.

    • per-table-config[v20.12.04.6]: This configuration allows you to specify various properties for target tables on a per table basis.

      • replication-factor[v21.12.02.6]: Replication factor for CDC topics. For Kafka cluster setup this defines the factor in which Kafka topic partitions are replicated on different brokers. We pass this config value to Kafka and Kafka drives the partition level replication.

      • num-shards[v21.12.02.6]: Number of partitions per data topic. By default this is set to a number of applier threads for getting the best possible scaling by allowing each individual applier thread to write to an independent partition of a Kafka topic.

      • shard-key[v21.12.02.6]: Shard key to be used for partitioning data topic.

      • shard-function[v21.12.02.7]: Sharding function to be used to deduce the partition allotment based on shard-key for all data topics. Values allowed are MOD and NONE.

        Default: By default, this parameter is set to NONE, meaning Kafka will use it’s partitioning algorithm.

    Below is a sample config for realtime mode:

    
    realtime:
      txn-size-rows: 1000
      before-image-format: ALL  # Allowed values : KEY, ALL
      after-image-format: ALL   # Allowed values : UPDATED, ALL
      # shard-key: id
      # num-shards: 1
      # shard-function: MOD
    
      # per-table-config:
      # - tables:
      #     io_blitzz_nation:
      #       shard-key: id
      #       num-shards: 16
      #       shard-function: NONE
      #     io_blitzz_region:
      #       shard-key: id
      #     io_blitzz_customer:
      #       shard-key: custkey
      #       num-shards: 16
    

For a detailed explanation of configuration parameters in the applier file, read Applier Reference.