Подключение Validator-interceptor к SEDR (валидация сообщений в форматах json/xml/avro)#
Пререквизиты#
В
{kafka_home}/libsдолжна находиться библиотекаkafka-interceptors-bundle*-fatjat.jar(входит в дистрибутив репликатора).В конфигурации воркеров (в файле
{kafka_home}/config/connect-distributed-<имя репликатора>.properties) проверить наличие настройки (в случае отсутствия добавить и перезапустить воркер после добавления):
connector.client.config.override.policy=All # Включает возможность переопределять конфигурацию consumer/producer в конфиге коннектора. (consumer.override. и producer.override.)
Установка#
Загрузить файл с нужной схемой (json/avro/xml) на все воркеры в кластере (имя и путь до файла должны быть одинаковыми)
Создать и загрузить на все воркеры в кластеры файл с конфигурацией validator-interceptor в соответствии с примером (комментарии, начинающиеся с «# удалить»):
schemas: {
# Топик, для которого подключается валидатор (* = любой топик)
"*": {
# Тип валидатора (json, avro или xml)
type: "json",
# Путь до схемы
schema: "/путь/до/схемы"
}
}
Создать и развернуть дистрибутив коннектора, добавив в
additional_configнастройки перехватчика в соответствии с примером (комментарии, начинающиеся с # удалить):
connectors:
- topics: имя source топика
output.topic: имя sink топика
tasks: 1
additional_config: |
# Подключить validator-interceptor к consumer
"consumer.override.interceptor.classes": "ru.sbt.ss.kafka.validator.interceptor.ValidatorConsumerInterceptor",
# Путь до директории, в которой хранятся файлы настроек перехватчика
"consumer.override.interceptor.validator.config.root.dir": "/full/path/to",
# Добавить путь до файла относительно предыдущего параметра с конфигурацией валидтора
"consumer.override.interceptor.validator.config": "относительный/путь/до/json-validator.conf",
# Включить режим фильтрации сообщений (Ошибки валидации будут залогрованы в error, невалидные сообщения будут проигнорированы)
"consumer.override.interceptor.validator.mode": "filter",
# Включить валидацию json-схемы
"consumer.override.interceptor.validator.schema.validation.enabled": "true"