Kafka-timestamp-interceptor#
Используется для замера задержки (latency) при прохождении через событийный сегмент.
Функции producer-interceptor:
Добавляет заголовок с текущим временем в сообщения;
Добавляет заголовок с уникальным идентификатором (UUID) в сообщения;
Рассчитывает latency записи сообщения в topic (от времени создания сообщения до времени подтверждения записи от брокера) и передает настраиваемым репортерам (подробнее о репортерах в примере конфигурации).
Функции consumer-interceptor:
Добавляет в запись заголовок со сгенерированным id сообщения (UUID).
Добавляет заголовок с временем сообщения (record.timestamp()).
Добавляет заголовок с временем чтения сообщения consumer.
Рассчитывает latency по настраиваемым заголовкам (от времени в заголовке сообщения до времени чтения сообщения) и передает настраиваемым репортерам (подробнее о репортерах в примере конфигурации).
Настройки producer-interceptor#
Добавить актуальную версию перехватчика в зависимости от проекта.
Добавить настройки перехватчика к настройкам producer в соответствии с примером.
Конфигурация producer#
# 1) Подключить interceptor
interceptor.classes = ru.sbt.ss.kafka.clients.timestamp.TimestampProducerInterceptor
# 2) Настроить список topicS. Interceptor будет обрабатывать только сообщения, отправленные в topicS из этого списка.
# Список можно оставить пустым - тогда interceptor будет обрабатывать все сообщения.
# interceptor.timestamp.topics = test, test-2
# 3) Настроить заголовки, добавляемые interceptor в сообщение:
# Имя заголовка с временем отправки сообщения producer
interceptor.timestamp.create.time.header = Latency.ProducerSystem.CreateTime
# Имя заголовка с сгенерированным UUID сообщения.
interceptor.timestamp.message.id.header = Latency.MessageId
# 3) ОПЦИОНАЛЬНО Настроить репортеры
# Настроить buffered-logger reporter, подробнее ниже
interceptor.timestamp.reporters = buffered-logger
# Настроить режим работы репортера, отключающий рассчет latency.
# Producer может рассчитывать только latency от времени сообщения до времени получения подтверждения об отправке этого сообщения.
# C настройкой topic message.timestamp.type=LogAppendTime время сообщения = время записи сообщения в topic, поэтому рассчитанная latency будет задержкой от времени записи в topic до времени получения подтверждения об отправке на клиенте (довольно бесполезная величина). Вместо latency в репортер отправляется время записи сообщения в topic.
interceptor.timstamp.reporter.mode = timestamp
# Имя заголовка с временем получения подтверждения отправки сообщения от брокера kafka
# Заголовком называется условно, в заголовки сообщения не попадает (сообщение уже отправлено), передается только как latency заголовок репортерам.
interceptor.timestamp.acknowledge.header = Latency.AckTime
# 4) ОПЦИОНАЛЬНО Настроить buffered-logger репортер. Данный репортер для каждого сообщения логирует:
# - метаданные сообщения (topic, партиция, оффсет)
# - acknowledge.header в формате timestamp (время получения подтверждения об отправке сообщения)
# Имя logger, в который будет логироваться latency. Также с помощью изменения уровня этого logger можно управлять перехватчиком.
interceptor.timestamp.logger.name = <имя logger>
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 1
# 5) Использовать window-metrics репортер не имеет смысла, т.к. репортерам вместо latency передается timestamp (время получения подтверждения об отправке сообщения).
Пример сообщения buffered-logger репортера для producer#
{
"message": {
"topic": "test",
"partition": "1",
"offset": "0",
"timestamp": "1620981646442"
},
"attributes": { "AckTime": "1620981646444" }
}
Настройки consumer-interceptor#
Добавить актуальную версию перехватчика в зависимости проекта.
Добавить настройки перехватчика к настройкам consumer в соответствии с примером.
Конфигурация consumer#
# 1) Подключить interceptor
interceptor.classes = ru.sbt.ss.kafka.clients.timestamp.TimestampConsumerInterceptor
# 2) Настроить заголовки, добавляемые interceptor в сообщение:
# Отключить добавление заголовка с идентификатором сообщения (уже добавлен consumer или producer поставщика)
interceptor.timestamp.include.message.id.header = false
# Имя заголовка с временем из сообщения (record.timestamp()). При настройке topic message.timestamp.type=LogAppendTime в сообщении будет время записи сообщения в topic, добавленное брокером.
interceptor.timestamp.message.time.header = Latency.SinkCluster.LogAppendTime
# Имя заголовка с временем чтения сообщения в Consumer
interceptor.timestamp.consume.time.header = Latency.ConsumerSystem.ConsumeTime
# 3) ОПЦИОНАЛЬНО Настроить репортеры
# Настроить два репортера, подробнее про каждый ниже
interceptor.timestamp.reporters = buffered-logger, window-metrics
# Настроить имена заголовков, которые будут переданы репотерам в качестве идентификаторов сообщения (можно списком через ',')
interceptor.timestamp.message.id.headers = Latency.MessageId
# Настроить имена заголовков, которые будут переданы репортерам для расчета latency (можно списком через ',')
interceptor.timestamp.latency.headers = Latency.ProducerSystem.CreateTime, Latency.SourceCluster.LogAppendTime, Latency.Replicator.ConsumeTime, Latency.Replicator.CreateTime, Latency.SinkCluster.LogAppendTime
# 4) ОПЦИОНАЛЬНО Настроить buffered-logger репортер. Данный репортер для каждого сообщения логирует:
# - метаданные сообщения (topic, партиция, offset)
# - время чтения сообщения
# - список идентификаторов сообщения (значения заголовков из message.id.headers)
# - рассчитанную latency («от значения заголовка до времени чтения» для каждого заголовка из latency.headers)
# Имя logger, в который будет логироваться latency. Также с помощью изменения уровня этого logger можно управлять interceptor
interceptor.timestamp.logger.name = <имя logger>
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 1
# 5) ОПЦИОНАЛЬНО Настроить window-metrics репортер. Данный репортер добавляет значение latency в JMX метрику, агрегируя среднее/максимальное и другие значения за указанный временной интервал.
# Также позволяет настроить лимиты для среднего и максимального значения за временной интервал. Если лимиты превышены - будет залогировано предупреждение.
# Имя заголовка, c которым будет работать репортер.
interceptor.timestamp.window-metrics.header.name = Latency.SourceCluster.LogAppendTime
# Временной интервал агрегации значений и проверки лимитов, в секундах
interceptor.timestamp.window-metrics.window.s = 60
# Лимиты среднего и максимального значения в ms
interceptor.timestamp.window-metrics.limits = avg=0, max=0
# Имя logger, в который будет залогировано предупреждение о превышении лимитов
interceptor.timestamp.window-metrics.limits.logger = <имя logger>
Пример сообщения buffered-logger репортера для consumer#
{
"message": {
"tags": { "MessageId": "ffcb4931-46eb-47ce-8bbf-8246e88272ce" },
"topic": "test",
"partition": "1",
"offset": "0",
"consumedAt": "1620981646444"
},
"attributes": { "CreateTime_latency": "222", "LogAppendTime_latency": "2" }
}
Репортеры#
logger репортер#
Данный репортер для каждого сообщения логирует:
метаданные сообщения (topic, партиция, оффсет);
время чтения сообщения;
список идентификаторов сообщения (значения заголовков из
message.id.headers);рассчитанную latency («от значения заголовка до времени чтения» для каждого заголовка из
latency.headers).
Пример сообщения logger репортера для producer:
{
"message": {
"topic": "test",
"partition": "1",
"offset": "0",
"timestamp": "1620981646442"
},
"attributes": { "AckTime": "1620981646444" }
}
Пример сообщения logger репортера для consumer:
{
"message": {
"tags": { "MessageId": "ffcb4931-46eb-47ce-8bbf-8246e88272ce" },
"topic": "test",
"partition": "1",
"offset": "0",
"consumedAt": "1620981646444"
},
"attributes": { "CreateTime_latency": "222", "LogAppendTime_latency": "2" }
}
buffered-logger репортер#
Накапливает записи в буфере (размер указывается параметром interceptor.timestamp.logger.buffer.size) и записывает записи в виде массива, когда буфер заполнен.
Позволяет увеличить производительность логирования.
Пример конфигурации:
interceptor.timestamp.reporter = buffered-logger
# Размер in-memory буфера, в который репортер сохраняет свои сообщения. Сообщения логируются одной пачкой при полном заполнении буфера.
interceptor.timestamp.logger.buffer.size = 256
Пример массива сообщений:
{
"messages": [
{
"message": {
"topic": "test",
"partition": "9",
"offset": "0",
"timestamp": "1621591467123"
},
"attributes": { "AckTime": "1621591467124" }
},
{
"message": {
"topic": "test",
"partition": "9",
"offset": "1",
"timestamp": "1621591467123"
},
"attributes": { "AckTime": "1621591467129" }
},
{
"message": {
"topic": "test",
"partition": "8",
"offset": "3",
"timestamp": "1621591468121"
},
"attributes": { "AckTime": "1621591468124" }
}
]
}
window-metrics репортер#
Записывает latency, рассчитанную по указанному заголовку (interceptor.timestamp.window-metrics.header.name) для последних interceptor.timestamp.window-metrics.window.s секунд в JMX-метрику.
Для JMX-метрики доступны следующие атрибуты:
50thPercentile75thPercentile95thPercentile98thPercentile999thPercentile99thPercentileCount(общее количество образцов с начала)MaxMean(среднее для текущего окна)MinSnapshotSize(количество образцов, используемых для вычисления статистики текущего окна)
interceptor.timestamp.reporter = window-metrics
# Настройки для 'window-metrics' репортера:
# Метрики будут представлены для этого заголовка
# interceptor.timestamp.window-metrics.header.name = AckTime
# Размер окна в секундах
# interceptor.timestamp.window-metrics.window.s = 60
#
# Список лимитов метрик через запятую, в limitName=limitValueMs формате. Поддерживаемые лимиты 'avg', 'min', 'max'.
# Каждая window.s интервальная метрика будет проверена, и если один из лимитов превышен - предупреждение будет залогировано.
# interceptor.timestamp.window-metrics.limits = avg=50, max=150
#
# Имя логгера для предупреждений по лимитам
# interceptor.timestamp.window-metrics.limits.logger = ru.sbt.ss.utils.metrics.HistogramLimits
Репортер может логировать предупреждения о метриках, превышающих установленные лимиты. Лимиты проверяются каждые interceptor.timestamp.window-metrics.window.s секунд.
Пример для interceptor.timestamp.window-metrics.limits = avg=0:
[WARN ] [r.s.ss.utils.metrics.HistogramLimits] 'type=consumer-interceptor-metrics,client-id=consumer-test_consumer-1,interceptor=TimestampInterceptor,topic=test,name=latency' 'AVERAGE' limit exceeded: 5.5 > 0.
Если указанный заголовок interceptor.timestamp.window-metrics.header.name отсутствует в сообщении или содержит неправильные данные - вместо него будет подставлено значение MaxLong.
Управление перехватчиками через уровни логирования#
Перехватчиками можно управлять с помощью изменения уровня логирования:
Уровень логирования |
Добавление заголовков в сообщение |
Агрегация в JMX метриках (window-metrics reporter) |
Логирование данных по каждому сообщению (buffered-logger reporter) |
|---|---|---|---|
OFF |
- |
- |
- |
ERROR |
+ |
- |
- |
WARN |
+ |
- |
- |
INFO |
+ |
+ |
- |
DEBUG |
+ |
+ |
+ |
Пример конфигурации logger (logback):
<?xml version="1.0" encoding="UTF-8"?>
<!-- Enable configuration updates check every 30 seconds -->
<configuration scan="true" scanPeriod="30 seconds">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%X{ott.req.id} %d{HH:mm:ss.SSS} [%-5level] [%logger{36}] [%thread] - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
<!-- level="debug" - process latency data and write it to log -->
<!-- level="info" - process latency data, don't write it to log (write it to other reporters, if present) -->
<!-- level="off" - don't process latency data -->
<logger name="ru.sbt.ss.kafka.clients.timestamp.reporter.LoggerReporter" level="debug"/>
</configuration>
Производительность logger#
Интенсивное логирование может снизить производительность producer и consumer. Для уменьшения влияния логирования на обработку сообщений можно:
Использовать
ch.qos.logback.classic.AsyncAppender, который будет обрабатывать все вызовы logger в отдельном потоке, не блокируя обработку сообщений. Ниже пример конфигурации с использованиемAsyncAppender:
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>myapp.log</file>
<encoder>
<pattern>%logger{35} - %msg%n</pattern>
</encoder>
</appender>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FILE" />
<!-- IMPORTANT! If not set to 0 - logger will discard messages if buffer overflows. -->
<discardingThreshold>0</discardingThreshold>
</appender>
<root level="DEBUG">
<appender-ref ref="ASYNC" />
</root>
</configuration>
Использовать репортер buffered-logger, который накапливает записи в буфере (размер указывается параметром
interceptor.timestamp.logger.buffer.size) и записывает записи в виде массива, когда буфер заполнен.
Пример конфигурации:
interceptor.timestamp.reporter = buffered-logger
interceptor.timestamp.logger.buffer.size = 256
Пример массива сообщений:
{
"messages": [
{
"message": {
"topic": "test",
"partition": "9",
"offset": "0",
"timestamp": "1621591467123"
},
"attributes": { "AckTime": "1621591467124" }
},
{
"topic": "test",
"message": {
"partition": "9",
"offset": "1",
"timestamp": "1621591467123"
},
"attributes": { "AckTime": "1621591467129" }
},
{
"message": {
"topic": "test",
"partition": "8",
"offset": "3",
"timestamp": "1621591468121"
},
"attributes": { "AckTime": "1621591468124" }
}
]
}
Подключение kafka-timestamp-interceptor к SEDR#
Kafka-timestamp-interceptor можно подключить к программному компоненту SEDR программного продукта Platform V Synapse Event replication. Механизм подключения описан в документации SEDR в документе Руководство оператора в разделе Подключение kafka-timestamp-interceptor к SEDR.