Kafka-timestamp-interceptor#

Используется для замера задержки (latency) при прохождении через событийный сегмент.

Функции producer-interceptor:

  1. Добавляет заголовок с текущим временем в сообщения;

  2. Добавляет заголовок с уникальным идентификатором (UUID) в сообщения;

  3. Рассчитывает latency записи сообщения в topic (от времени создания сообщения до времени подтверждения записи от брокера) и передает настраиваемым репортерам (подробнее о репортерах в примере конфигурации).

Функции consumer-interceptor:

  1. Добавляет в запись заголовок со сгенерированным id сообщения (UUID).

  2. Добавляет заголовок с временем сообщения (record.timestamp()).

  3. Добавляет заголовок с временем чтения сообщения consumer.

  4. Рассчитывает latency по настраиваемым заголовкам (от времени в заголовке сообщения до времени чтения сообщения) и передает настраиваемым репортерам (подробнее о репортерах в примере конфигурации).

Настройки producer-interceptor#

  1. Добавить актуальную версию перехватчика в зависимости от проекта.

  2. Добавить настройки перехватчика к настройкам producer в соответствии с примером.

Конфигурация producer#

# 1) Подключить interceptor
interceptor.classes = ru.sbt.ss.kafka.clients.timestamp.TimestampProducerInterceptor

# 2) Настроить список topicS. Interceptor будет обрабатывать только сообщения, отправленные в topicS из этого списка.
# Список можно оставить пустым - тогда interceptor будет обрабатывать все сообщения.
# interceptor.timestamp.topics = test, test-2

# 3) Настроить заголовки, добавляемые interceptor в сообщение:
# Имя заголовка с временем отправки сообщения producer
interceptor.timestamp.create.time.header = Latency.ProducerSystem.CreateTime

# Имя заголовка с сгенерированным UUID сообщения.
interceptor.timestamp.message.id.header = Latency.MessageId

# 3) ОПЦИОНАЛЬНО Настроить репортеры

# Настроить buffered-logger reporter, подробнее ниже
interceptor.timestamp.reporters = buffered-logger

# Настроить режим работы репортера, отключающий рассчет latency.
# Producer может рассчитывать только latency от времени сообщения до времени получения подтверждения об отправке этого сообщения.
# C настройкой topic message.timestamp.type=LogAppendTime время сообщения = время записи сообщения в topic, поэтому рассчитанная latency будет задержкой от времени записи в topic до времени получения подтверждения об отправке на клиенте (довольно бесполезная величина). Вместо latency в репортер отправляется время записи сообщения в topic.
interceptor.timstamp.reporter.mode = timestamp

# Имя заголовка с временем получения подтверждения отправки сообщения от брокера kafka
# Заголовком называется условно, в заголовки сообщения не попадает (сообщение уже отправлено), передается только как latency заголовок репортерам.
interceptor.timestamp.acknowledge.header = Latency.AckTime

# 4) ОПЦИОНАЛЬНО Настроить buffered-logger репортер. Данный репортер для каждого сообщения логирует:
# - метаданные сообщения (topic, партиция, оффсет)
# - acknowledge.header в формате timestamp (время получения подтверждения об отправке сообщения)

# Имя logger, в который будет логироваться latency. Также с помощью изменения уровня этого logger можно управлять перехватчиком.
interceptor.timestamp.logger.name = <имя logger>
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 1

# 5) Использовать window-metrics репортер не имеет смысла, т.к. репортерам вместо latency передается timestamp (время получения подтверждения об отправке сообщения).

Пример сообщения buffered-logger репортера для producer#

{
    "message": {
        "topic": "test",
        "partition": "1",
        "offset": "0",
        "timestamp": "1620981646442"
    },
    "attributes": { "AckTime": "1620981646444" }
}

Настройки consumer-interceptor#

  1. Добавить актуальную версию перехватчика в зависимости проекта.

  2. Добавить настройки перехватчика к настройкам consumer в соответствии с примером.

Конфигурация consumer#

# 1) Подключить interceptor
interceptor.classes = ru.sbt.ss.kafka.clients.timestamp.TimestampConsumerInterceptor
  
# 2) Настроить заголовки, добавляемые interceptor в сообщение:
  
# Отключить добавление заголовка с идентификатором сообщения (уже добавлен consumer или producer поставщика)
interceptor.timestamp.include.message.id.header = false
# Имя заголовка с временем из сообщения (record.timestamp()). При настройке topic message.timestamp.type=LogAppendTime в сообщении будет время записи сообщения в topic, добавленное брокером.
interceptor.timestamp.message.time.header = Latency.SinkCluster.LogAppendTime
# Имя заголовка с временем чтения сообщения в Consumer
interceptor.timestamp.consume.time.header = Latency.ConsumerSystem.ConsumeTime
  
# 3) ОПЦИОНАЛЬНО Настроить репортеры
  
# Настроить два репортера, подробнее про каждый ниже
interceptor.timestamp.reporters = buffered-logger, window-metrics
# Настроить имена заголовков, которые будут переданы репотерам в качестве идентификаторов сообщения (можно списком через ',')
interceptor.timestamp.message.id.headers = Latency.MessageId
# Настроить имена заголовков, которые будут переданы репортерам для расчета latency (можно списком через ',')
interceptor.timestamp.latency.headers = Latency.ProducerSystem.CreateTime, Latency.SourceCluster.LogAppendTime, Latency.Replicator.ConsumeTime, Latency.Replicator.CreateTime, Latency.SinkCluster.LogAppendTime
  
# 4) ОПЦИОНАЛЬНО Настроить buffered-logger репортер. Данный репортер для каждого сообщения логирует:
# - метаданные сообщения (topic, партиция, offset)
# - время чтения сообщения
# - список идентификаторов сообщения (значения заголовков из message.id.headers)
# - рассчитанную latency («от значения заголовка до времени чтения» для каждого заголовка из latency.headers)
  
# Имя logger, в который будет логироваться latency. Также с помощью изменения уровня этого logger можно управлять interceptor
interceptor.timestamp.logger.name = <имя logger>
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 1
  
# 5) ОПЦИОНАЛЬНО Настроить window-metrics репортер. Данный репортер добавляет значение latency в JMX метрику, агрегируя среднее/максимальное и другие значения за указанный временной интервал.
# Также позволяет настроить лимиты для среднего и максимального значения за временной интервал. Если лимиты превышены - будет залогировано предупреждение.
  
# Имя заголовка, c которым будет работать репортер.
interceptor.timestamp.window-metrics.header.name = Latency.SourceCluster.LogAppendTime
# Временной интервал агрегации значений и проверки лимитов, в секундах
interceptor.timestamp.window-metrics.window.s = 60
# Лимиты среднего и максимального значения в ms
interceptor.timestamp.window-metrics.limits = avg=0, max=0
# Имя logger, в который будет залогировано предупреждение о превышении лимитов
interceptor.timestamp.window-metrics.limits.logger = <имя logger>

Пример сообщения buffered-logger репортера для consumer#

{
    "message": {
        "tags": { "MessageId": "ffcb4931-46eb-47ce-8bbf-8246e88272ce" },
        "topic": "test",
        "partition": "1",
        "offset": "0",
        "consumedAt": "1620981646444"
    },
    "attributes": { "CreateTime_latency": "222", "LogAppendTime_latency": "2" }
}

Репортеры#

logger репортер#

Данный репортер для каждого сообщения логирует:

  • метаданные сообщения (topic, партиция, оффсет);

  • время чтения сообщения;

  • список идентификаторов сообщения (значения заголовков из message.id.headers);

  • рассчитанную latency («от значения заголовка до времени чтения» для каждого заголовка из latency.headers).

Пример сообщения logger репортера для producer:

{
    "message": {
        "topic": "test",
        "partition": "1",
        "offset": "0",
        "timestamp": "1620981646442"
    },
    "attributes": { "AckTime": "1620981646444" }
}

Пример сообщения logger репортера для consumer:

{
    "message": {
        "tags": { "MessageId": "ffcb4931-46eb-47ce-8bbf-8246e88272ce" },
        "topic": "test",
        "partition": "1",
        "offset": "0",
        "consumedAt": "1620981646444"
    },
    "attributes": { "CreateTime_latency": "222", "LogAppendTime_latency": "2" }
}

buffered-logger репортер#

Накапливает записи в буфере (размер указывается параметром interceptor.timestamp.logger.buffer.size) и записывает записи в виде массива, когда буфер заполнен. Позволяет увеличить производительность логирования.

Пример конфигурации:

interceptor.timestamp.reporter = buffered-logger
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 256

Пример массива сообщений:

{
"messages": [
    {
    "message": {
        "topic": "test",
        "partition": "9",
        "offset": "0",
        "timestamp": "1621591467123"
    },
    "attributes": { "AckTime": "1621591467124" }
    },
    {
    "message": {
        "topic": "test",
        "partition": "9",
        "offset": "1",
        "timestamp": "1621591467123"
    },
    "attributes": { "AckTime": "1621591467129" }
    },
    {
    "message": {
        "topic": "test",
        "partition": "8",
        "offset": "3",
        "timestamp": "1621591468121"
    },
    "attributes": { "AckTime": "1621591468124" }
    }
]
}

window-metrics репортер#

Записывает latency, рассчитанную по указанному заголовку (interceptor.timestamp.window-metrics.header.name) для последних interceptor.timestamp.window-metrics.window.s секунд в JMX-метрику.

Для JMX-метрики доступны следующие атрибуты:

  • 50thPercentile

  • 75thPercentile

  • 95thPercentile

  • 98thPercentile

  • 999thPercentile

  • 99thPercentile

  • Count (общее количество образцов с начала)

  • Max

  • Mean (среднее для текущего окна)

  • Min

  • SnapshotSize (количество образцов, используемых для вычисления статистики текущего окна)

interceptor.timestamp.reporter = window-metrics

# Настройки для 'window-metrics' репортера:
# Метрики будут представлены для этого заголовка
# interceptor.timestamp.window-metrics.header.name = AckTime

# Размер окна в секундах
# interceptor.timestamp.window-metrics.window.s = 60
#
# Список лимитов метрик через запятую, в limitName=limitValueMs формате. Поддерживаемые лимиты 'avg', 'min', 'max'.
# Каждая window.s интервальная метрика будет проверена, и если один из лимитов превышен - предупреждение будет залогировано.
# interceptor.timestamp.window-metrics.limits = avg=50, max=150
#
# Имя логгера для предупреждений по лимитам
# interceptor.timestamp.window-metrics.limits.logger = ru.sbt.ss.utils.metrics.HistogramLimits

Репортер может логировать предупреждения о метриках, превышающих установленные лимиты. Лимиты проверяются каждые interceptor.timestamp.window-metrics.window.s секунд.

Пример для interceptor.timestamp.window-metrics.limits = avg=0:

[WARN ] [r.s.ss.utils.metrics.HistogramLimits] 'type=consumer-interceptor-metrics,client-id=consumer-test_consumer-1,interceptor=TimestampInterceptor,topic=test,name=latency' 'AVERAGE' limit exceeded: 5.5 > 0.

Если указанный заголовок interceptor.timestamp.window-metrics.header.name отсутствует в сообщении или содержит неправильные данные - вместо него будет подставлено значение MaxLong.

Управление перехватчиками через уровни логирования#

Перехватчиками можно управлять с помощью изменения уровня логирования:

Уровень логирования

Добавление заголовков в сообщение

Агрегация в JMX метриках (window-metrics reporter)

Логирование данных по каждому сообщению (buffered-logger reporter)

OFF

-

-

-

ERROR

+

-

-

WARN

+

-

-

INFO

+

+

-

DEBUG

+

+

+

Пример конфигурации logger (logback):

<?xml version="1.0" encoding="UTF-8"?>
<!-- Enable configuration updates check every 30 seconds -->
<configuration scan="true" scanPeriod="30 seconds">
     
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%X{ott.req.id} %d{HH:mm:ss.SSS} [%-5level] [%logger{36}] [%thread] - %msg%n</pattern>
        </encoder>
    </appender>
     
    <root level="info">
        <appender-ref ref="STDOUT"/>
    </root>
     
    <!-- level="debug" - process latency data and write it to log -->
    <!-- level="info" - process latency data, don't write it to log (write it to other reporters, if present) -->
    <!-- level="off" - don't process latency data -->
    <logger name="ru.sbt.ss.kafka.clients.timestamp.reporter.LoggerReporter" level="debug"/>
 
</configuration>

Производительность logger#

Интенсивное логирование может снизить производительность producer и consumer. Для уменьшения влияния логирования на обработку сообщений можно:

  1. Использовать ch.qos.logback.classic.AsyncAppender, который будет обрабатывать все вызовы logger в отдельном потоке, не блокируя обработку сообщений. Ниже пример конфигурации с использованием AsyncAppender:

<configuration>
  <appender name="FILE" class="ch.qos.logback.core.FileAppender">
    <file>myapp.log</file>
    <encoder>
      <pattern>%logger{35} - %msg%n</pattern>
    </encoder>
  </appender>
 
  <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="FILE" />
      <!-- IMPORTANT! If not set to 0 - logger will discard messages if buffer overflows. -->
      <discardingThreshold>0</discardingThreshold>
  </appender>
 
  <root level="DEBUG">
    <appender-ref ref="ASYNC" />
  </root>
</configuration>
  1. Использовать репортер buffered-logger, который накапливает записи в буфере (размер указывается параметром interceptor.timestamp.logger.buffer.size) и записывает записи в виде массива, когда буфер заполнен.

Пример конфигурации:

interceptor.timestamp.reporter = buffered-logger
interceptor.timestamp.logger.buffer.size = 256

Пример массива сообщений:

{
   "messages": [
      {
         "message": {
            "topic": "test",
            "partition": "9",
            "offset": "0",
            "timestamp": "1621591467123"
         },
         "attributes": { "AckTime": "1621591467124" }
      },
      {
            "topic": "test",
         "message": {
            "partition": "9",
            "offset": "1",
            "timestamp": "1621591467123"
         },
         "attributes": { "AckTime": "1621591467129" }
      },
      {
         "message": {
            "topic": "test",
            "partition": "8",
            "offset": "3",
            "timestamp": "1621591468121"
         },
         "attributes": { "AckTime": "1621591468124" }
      }
   ]
}

Подключение kafka-timestamp-interceptor к SEDR#

Kafka-timestamp-interceptor можно подключить к программному компоненту SEDR программного продукта Platform V Synapse Event replication. Механизм подключения описан в документации SEDR в документе Руководство оператора в разделе Подключение kafka-timestamp-interceptor к SEDR.