Схема взаимодействия (интерсепторы на поставщиках/потребителях, message.timestamp.type=LogAppendTime)#
Последовательность выполнения#

В данном разделе представлены сценарии использования для пользователя с ролью Администратор.
Настройка коннектора компонента SEDR#
Отключить передачу timestamp сообщения из кластера-источника (эти настройки будут использованы в шаге 4):
transfer.timestamp = falseНастроить интерсептор для консьюмера компонента SEDR (эти настройки будут использованы в шаге 4). Настройки указаны в примере.
Настроить интерсептор для продюсера компонента SEDR (эти настройки будут использованы в шаге 4). Настройки указаны в примере.
Создать дистрибутив коннектора, добавив в
additional_configнастройки интерсептора из предыдущих шагов. Пример дистрибутива:
release.yaml
connectors:
- topics: REPLICATOR.LATENCY.INPUT
output.topic: REPLICATOR.LATENCY.OUTPUT
tasks: 1
additional_config: |
"transfer.timestamp": "false",
"consumer.override.interceptor.classes": "ru.sbt.ss.kafka.clients.timestamp.TimestampConsumerInterceptor",
"consumer.override.interceptor.timestamp.message.id.header": "Latency.MessageId",
"consumer.override.interceptor.timestamp.message.time.header": "Latency.SourceCluster.LogAppendTime",
"consumer.override.interceptor.timestamp.consume.time.header": "Latency.Replicator.ConsumeTime",
"consumer.override.interceptor.timestamp.reporters": "buffered-logger, window-metrics",
"consumer.override.interceptor.timestamp.message.id.headers": "Latency.MessageId",
"consumer.override.interceptor.timestamp.latency.headers": "Latency.ProducerSystem.CreateTime, Latency.SourceCluster.LogAppendTime",
"consumer.override.interceptor.timestamp.logger.name": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.consumer",
"consumer.override.interceptor.timestamp.logger.buffer.size": "1",
"consumer.override.interceptor.timestamp.window-metrics.header.name": "Latency.SourceCluster.LogAppendTime",
"consumer.override.interceptor.timestamp.window-metrics.window.s": "60",
"consumer.override.interceptor.timestamp.window-metrics.limits": "avg=0, max=0",
"consumer.override.interceptor.timestamp.window-metrics.limits.logger": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.consumer",
"producer.override.interceptor.classes": "ru.sbt.ss.kafka.clients.timestamp.TimestampProducerInterceptor",
"producer.override.interceptor.timestamp.reporters": "buffered-logger",
"producer.override.interceptor.timestamp.reporter.mode": "timestamp",
"producer.override.interceptor.timestamp.include.message.id.header": "false",
"producer.override.interceptor.timestamp.create.time.header": "Latency.Replicator.CreateTime",
"producer.override.interceptor.timestamp.acknowledge.header": "Latency.AckTime",
"producer.override.interceptor.timestamp.logger.name": "latency.REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector.producer",
"producer.override.interceptor.timestamp.logger.buffer.size": "1",
name: REPLICATOR.LATENCY.INPUT-to-Retail-K2_Deposit.REPLICATOR.LATENCY.OUTPUT-connector
worker: synapse_replicator
json_template: replicator.json.j2
Настройка продюсера поставщика#
Добавить актуальную версию интерсептора в зависимости проекта.
Добавить настройки интерсептора к настройкам продюсера. Настройки указаны в примере.
Настройка консьюмера потребителя#
Добавить актуальную версию интерсептора в зависимости проекта.
Добавить настройки интерсептора к настройкам консьюмера. Настройки указаны в примере.
Результат#
Выполнены настройки коннектора компонента SEDR, продюсера поставщика и консьюмера потребителя.