Схема взаимодействия (интерсепторы на поставщиках/потребителях, message.timestamp.type=LogAppendTime)#

Последовательность выполнения#

В данном разделе представлены сценарии использования для пользователя с ролью Администратор.

Настройка коннектора компонента SEDR#

  1. Отключить передачу timestamp сообщения из кластера-источника (эти настройки будут использованы в шаге 4):

    transfer.timestamp = false
    
  2. Настроить интерсептор для консьюмера компонента SEDR (эти настройки будут использованы в шаге 4). Настройки указаны в примере.

  3. Настроить интерсептор для продюсера компонента SEDR (эти настройки будут использованы в шаге 4). Настройки указаны в примере.

  4. Создать дистрибутив коннектора, добавив в additional_config настройки интерсептора из предыдущих шагов. Пример дистрибутива:

release.yaml
connectors:
- topics: REPLICATOR.LATENCY.INPUT
  output.topic: REPLICATOR.LATENCY.OUTPUT
  tasks: 1
  additional_config: |
    "transfer.timestamp": "false",
    "consumer.override.interceptor.classes": "ru.sbt.ss.kafka.clients.timestamp.TimestampConsumerInterceptor",
    "consumer.override.interceptor.timestamp.message.id.header": "Latency.MessageId",
    "consumer.override.interceptor.timestamp.message.time.header": "Latency.SourceCluster.LogAppendTime",
    "consumer.override.interceptor.timestamp.consume.time.header": "Latency.Replicator.ConsumeTime",
    "consumer.override.interceptor.timestamp.reporters": "buffered-logger, window-metrics",
    "consumer.override.interceptor.timestamp.message.id.headers": "Latency.MessageId",
    "consumer.override.interceptor.timestamp.latency.headers": "Latency.ProducerSystem.CreateTime, Latency.SourceCluster.LogAppendTime",
    "consumer.override.interceptor.timestamp.logger.name": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.consumer",
    "consumer.override.interceptor.timestamp.logger.buffer.size": "1",
    "consumer.override.interceptor.timestamp.window-metrics.header.name": "Latency.SourceCluster.LogAppendTime",
    "consumer.override.interceptor.timestamp.window-metrics.window.s": "60",
    "consumer.override.interceptor.timestamp.window-metrics.limits": "avg=0, max=0",
    "consumer.override.interceptor.timestamp.window-metrics.limits.logger": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.consumer",
    "producer.override.interceptor.classes": "ru.sbt.ss.kafka.clients.timestamp.TimestampProducerInterceptor",
    "producer.override.interceptor.timestamp.reporters": "buffered-logger",
    "producer.override.interceptor.timestamp.reporter.mode": "timestamp",
    "producer.override.interceptor.timestamp.include.message.id.header": "false",
    "producer.override.interceptor.timestamp.create.time.header": "Latency.Replicator.CreateTime",
    "producer.override.interceptor.timestamp.acknowledge.header": "Latency.AckTime",
    "producer.override.interceptor.timestamp.logger.name": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.producer",
    "producer.override.interceptor.timestamp.logger.buffer.size": "1",
  name: REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector
  worker: synapse_replicator
  json_template: replicator.json.j2

Настройка продюсера поставщика#

  1. Добавить актуальную версию интерсептора в зависимости проекта.

  2. Добавить настройки интерсептора к настройкам продюсера. Настройки указаны в примере.

Настройка консьюмера потребителя#

  1. Добавить актуальную версию интерсептора в зависимости проекта.

  2. Добавить настройки интерсептора к настройкам консьюмера. Настройки указаны в примере.

Результат#

Выполнены настройки коннектора компонента SEDR, продюсера поставщика и консьюмера потребителя.