Подключение Validator-interceptor к SEDR (валидация сообщений в форматах json/xml/avro)#

Пререквизиты#

  1. В {kafka_home}/libs должна находиться библиотека kafka-interceptors-bundle*-fatjat.jar (входит в дистрибутив репликатора).

  2. В конфигурации воркеров (в файле {kafka_home}/config/connect-distributed-<имя репликатора>.properties) проверить наличие настройки (в случае отсутствия добавить и перезапустить воркер после добавления):

connector.client.config.override.policy=All # Включает возможность переопределять конфигурацию consumer/producer в конфиге коннектора. (consumer.override. и producer.override.)

Установка#

  1. Загрузить файл с нужной схемой (json/avro/xml) на все воркеры в кластере (имя и путь до файла должны быть одинаковыми)

  2. Создать и загрузить на все воркеры в кластеры файл с конфигурацией validator-interceptor в соответствии с примером (комментарии, начинающиеся с «# удалить»):

schemas: {
    # Топик, для которого подключается валидатор (* = любой топик)
    "*": {
        # Тип валидатора (json, avro или xml)
        type: "json",
        # Путь до схемы
        schema: "/путь/до/схемы"
    }
}
  1. Создать и развернуть дистрибутив коннектора, добавив в additional_config настройки перехватчика в соответствии с примером (комментарии, начинающиеся с # удалить):

connectors:
- topics: имя source топика
  output.topic: имя sink топика
  tasks: 1
  additional_config: |
    # Подключить validator-interceptor к consumer
    "consumer.override.interceptor.classes": "ru.sbt.ss.kafka.validator.interceptor.ValidatorConsumerInterceptor",
    # Путь до директории, в которой хранятся файлы настроек перехватчика 
    "consumer.override.interceptor.validator.config.root.dir": "/full/path/to",
    # Добавить путь до файла относительно предыдущего параметра с конфигурацией валидтора
    "consumer.override.interceptor.validator.config": "относительный/путь/до/json-validator.conf",
    # Включить режим фильтрации сообщений (Ошибки валидации будут залогрованы в error, невалидные сообщения будут проигнорированы)
    "consumer.override.interceptor.validator.mode": "filter",
    # Включить валидацию json-схемы
    "consumer.override.interceptor.validator.schema.validation.enabled": "true"