Validator-interceptor#
Описание модуля#
Реализует интерфейсы ConsumerInterceptor/ProducerInterceptor и позволяет валидировать сообщения в форматах json/xml/avro по схемам, указанным в конфигурационном файле.
Перехватчики позволяют использовать разные схемы для разных topics.
Для форматов json и xml поддерживается валидация схем.
Валидаторы могут работать со следующими типами сообщений:
json:
String;
Array[Byte];
com.fasterxml.jackson.databind.JsonNode.
xml
String;
Array[Byte];
javax.xml.transform.Source;
org.w3c.dom.Document.
avro
Array[Byte].
Подключение к Kafka клиентам#
Добавить актуальную версию перехватчика в зависимости проекта.
Создать конфигурационный файл с настройками валидаторов и схем в соответствии с примером. Конфигурационный файл с настройками валидаторов и схем создается самостоятельно администратором.
Добавить настройки перехватчика к настройкам kafka клиента в соответствии с примерами.
Примеры конфигурации#
Пример конфигурации файла со списком путей до схем#
Для каждого topic можно задать отдельную схему, а также схему по умолчанию с именем topic "*".
Имя topic, который будет использоваться как topic по умолчанию, можно задать с помощью настройки interceptor.validator.default.topic.
Если схема по умолчанию отсутствует и сообщение было отправлено/получено из topic, для которого не указана схема - сообщение считается НЕВАЛИДНЫМ.
Если валидация для какого-либо topic не требуется - можно настроить валидатор без схемы с типом noop.
schemas: {
# Валидатор для topic "topic-1"
"topic-1": {
# Тип схемы/сообщений
type: "json",
# Путь до схемы на файловой системе
schema: "path/to/schema.json"
# ОПЦИОНАЛЬНО Кодировка схемы
schemaEncoding: "UTF-8"
}
# Валидатор для topic "topic-2"
"topic-2": {
# Тип схемы/сообщений
type: "xml"
# Путь до схемы на файловой системе
schema: "file://path/to/schema.xml"
# ОПЦИОНАЛЬНО Кодировка схемы
schemaEncoding: "UTF-8"
}
# Валидатор для topic "topic-3"
"topic-3": {
# Тип схемы/сообщений
type: "avro"
# Путь до схемы в classpath
schema: "classpath://path/to/schema.avsc"
# ОПЦИОНАЛЬНО Кодировка схемы
schemaEncoding: "UTF-8"
}
# Валидатор для всех остальных topics
"*": {
# Валидатор с типом noop считает любое сообщение валидным
type: "noop"
}
}
Пример конфигурации Kafka Producer#
...
# 1) Подключить перехватчик
interceptor.classes=ru.sbt.ss.kafka.validator.interceptor.ValidatorProducerInterceptor
# 2) Указать путь до конфигурационного файла с настройками валидаторов
# Загрузка из файловой системы
interceptor.validator.config=path/to/validator.conf
# Загрузка из classpath
# interceptor.validator.config=classpath://path/to/validator.conf
# 3) ОПЦИОНАЛЬНО Включить валидацию схем при запуске перехватчика
# interceptor.validator.schema.validation.enabled=true
# 4) ОПЦИОНАЛЬНО Настройить режим работы producer при ошибках валидации (по умолчанию failOnValue)
# interceptor.validator.mode = failOnValue
# 5) ОПЦИОНАЛЬНО Настроить провайдер для загрузки схем
# interceptor.validator.schema.provider.class =
Пример конфигурации Kafka Consumer#
...
# 1) Подключить перехватчик
interceptor.classes=ru.sbt.ss.kafka.validator.interceptor.ValidatorConsumerInterceptor
# 2) Указать путь до конфигурационного файла с настройками валидаторов
# Загрузка из файловой системы
interceptor.validator.config=path/to/validator.conf
# Загрузка из classpath
# interceptor.validator.config=classpath://path/to/validator.conf
# 3) ОПЦИОНАЛЬНО Включить валидацию схем при запуске перехватчика
# interceptor.validator.schema.validation.enabled=true
# 4) ОПЦИОНАЛЬНО Настройить режим работы consumer при ошибках валидации (по умолчанию failOnValue)
# interceptor.validator.mode = failOnValue
# 5) ОПЦИОНАЛЬНО Настроить провайдер для загрузки схем
# interceptor.validator.schema.provider.class =
Поведение при ошибках#
Поведение при ошибках валидации настраивается с помощью параметра interceptor.validator.mode.
В случае если сообщение не прошло валидацию при отправке (producer.send()) возможны следующие режимы:
failOnValue(используется по умолчанию) - клиент получит сообщение-заглушку вместо невалидного сообщения, которое содержит null вместо value и выбросит исключениеru.sbt.ss.kafka.interceptors.ProducerInterceptorExceptionпри вызовеrecord.value()(перед сериализацией).
Данный способ позволяет использовать логику обработки ошибок kafka-клиента, в том числе вызывать методы перехватчиков (но не callback).
failOnSend- будет выброшено исключениеru.sbt.ss.kafka.interceptors.ProducerInterceptorError(extends Throwable) c сообщением, игнорирую логику обработки ошибок kafka-клиента.
Сообщения исключений имеют формат Error while processing record(topic: topic): *ошибка валидации в зависимости от формата сообщения*.
В случае, если сообщение не прошло валидацию при получении (consumer.poll()) поведение настраивается параметром interceptor.validator.mode:
failOnValue(используется по умолчанию) — клиент получит сообщение-заглушку вместо невалидного сообщения, которое содержит null вместо value и выбросит исключениеru.sbt.ss.kafka.interceptors.ConsumerInterceptorExceptionпри вызовеrecord.value().filter— сообщение об ошибке будет залогировано в error, клиент не получит невалидное сообщение.failOnConsume— методconsumer.poll()выбросит исключениеru.sbt.ss.kafka.interceptors.ConsumerInterceptorError, клиент не получит ни одного сообщения из пачки.
Загрузка конфигурации/схем из classpath#
Для загрузки конфигурации/схем из classpath небходимо указать протокол classpath:// в пути до файла, например:
для файла конфигурации:
interceptor.validator.config=classpath://path/to/validator.conf;для файла со схемой:
schema: "classpath://path/to/schema.xsd".
Если xsd схема была загружена из classpath — дополнительные схемы, указанные с помощью <include schemaLocation="additional.xsd"> тоже будут загружены из classpath, указывать префикс внутри схемы не обязательно.
Относительные пути в конфигурации#
Относительные пути до файлов в конфигурации обычно разрешаются относительно директории запуска приложения.
В некоторых случаях (для автоматических установок, например для установок с помощью скриптов) может быть полезно явно указать root директорию, относительно которой будут разрешаться относительные пути:
interceptor.config.root.dir(общая для всех перехватчиков с подобной настройкой);interceptor.validator.config.root.dir(переопределяет общую настройку).
Данная настройка не применяется к путям в classpath (classpath://path/to/file.txt).
Пример конфигурации:
interceptor.validator.config.root.dir=/full/path/to
# interceptor.config.root.dir=/full/path/to
# /full/path/to/validator.conf
interceptor.validator.config=validator.conf
Также работает и для путей до схем в файле конфигурации валидатора:
schemas: {
"topic-1": {
type: "json",
# /full/path/to/schema.conf
schema: "schema.json"
}
"topic-2": {
type: "xml"
# /full/path/to/schema.xml
schema: "schema.xml"
}
Интерфейс провайдера для загрузки схем#
Перехватчик предоставляет интерфейс ru.sbt.ss.kafka.validator.ValidatorInterceptorSchemasProvider, позволяющий использовать свою реализацию провайдера для загрузки схем валидации.
Загрузка происходит следующим образом:
Перехватчик получает имя класса провайдера из конфигурации kafka-клиента с помощью параметра
intercerptor.validator.schema.provider.class.Перехватчик создает новый инстанс провайдера c помощью конструктора по умолчанию без параметров.
Перехватчик вызывает метод
void configure(Map<String, String> configs)c конфигурацией, переданной kafka-клиенту.Интересптор вызывает метод
Map<String, SchemaProvider> getSchemas()и создает валидатор для каждого topic, присутствующего вMap<String, SchemaProvider>(ключ = имя topic).
package ru.sbt.ss.kafka.validator;
import ru.sbt.ss.validator.schema.SchemaProvider;
import java.util.Map;
/**
* Schemas provider for validator-interceptor.
*
* Implementations should have default constructor without arguments.
*
* Interceptor will create and configure instance of this interface,
* then call getSchemas() method and initialize validators for each entry (topic) of the schemas map.
* Schemas map should include default topic as well.
*/
public interface ValidatorInterceptorSchemasProvider {
/**
* This method is called once after interceptor creation
*
* @return map of topic -> {@link SchemaProvider}
*/
Map<String, SchemaProvider> getSchemas();
/**
* Optional configuration method, will be called after provider creation.
*
* @param configs kafka configuration
*/
default void configure(Map<String, String> configs) {
// NOOP
}
}
/**
* Validation schema provider for {@link ru.sbt.ss.validator.Validator}
*/
public interface SchemaProvider {
/**
* Schema name, used in logging/metrics
*/
String getName();
/**
* Schema as string
*/
String getSchema();
/**
* Schema type
*/
SchemaType getType();
}
Пример реализации интерфейса провайдера схем#
package ru.sbt.ss.kafka.validator;
import ru.sbt.ss.kafka.validator.config.ValidatorInterceptorConfig;
import ru.sbt.ss.validator.schema.SchemaProvider;
import ru.sbt.ss.validator.schema.SchemaType;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
public class CustomInterceptorSchemasProvider implements ValidatorInterceptorSchemasProvider {
public static final String NOOP_VALIDATOR_ENABLED = ValidatorInterceptorConfig.ValidatorInterceptorPrefix() + "noop.validator.enabled";
private static final SchemaProvider AVRO_SCHEMA_PROVIDER = new ClasspathSchemaProvider("avro/correct_avro_schema.avsc", SchemaType.AVRO);
private static final SchemaProvider JSON_SCHEMA_PROVIDER = new ClasspathSchemaProvider("json/schema_first.json", SchemaType.JSON);
private static final SchemaProvider XML_SCHEMA_PROVIDER = new ClasspathSchemaProvider("xml/schema/xml_schema.xsd", SchemaType.XML);
private static final SchemaProvider NOOP_SCHEMA_PROVIDER = new SchemaProvider() {
@Override
public String getName() {
return "noop";
}
@Override
public String getSchema() {
return null;
}
@Override
public SchemaType getType() {
return SchemaType.NOOP;
}
};
private final Map<String, SchemaProvider> schemas = new HashMap<>();
@Override
public void configure(Map<String, String> configs) {
schemas.put("avro-topic", AVRO_SCHEMA_PROVIDER);
schemas.put("json-topic", JSON_SCHEMA_PROVIDER);
schemas.put("xml-topic", XML_SCHEMA_PROVIDER);
final boolean noopValidatorEnabled = Boolean.parseBoolean(configs.getOrDefault(NOOP_VALIDATOR_ENABLED, "true"));
if (noopValidatorEnabled) schemas.put(ValidatorInterceptorConfig.DefaultTopicDefault(), NOOP_SCHEMA_PROVIDER);
}
@Override
public Map<String, SchemaProvider> getSchemas() {
return schemas;
}
public static class ClasspathSchemaProvider implements SchemaProvider {
private final String name;
private String schema;
private final SchemaType type;
public ClasspathSchemaProvider(String schemaPath, SchemaType type) {
this.name = schemaPath;
try {
this.schema = new String(getClass().getClassLoader().getResourceAsStream(schemaPath).readAllBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
this.schema = "";
e.printStackTrace();
}
this.type = type;
}
@Override
public String getName() {
return name;
}
@Override
public String getSchema() {
return schema;
}
@Override
public SchemaType getType() {
return type;
}
}
}
Валидация схем#
При настройке interceptor.validator.schema.validation.enabled=true json/xsd схемы будут провалидированы при загрузке.
Json#
Механизм валидации аналогичен валидации обычных сообщений, в качестве схемы используется json метасхема версии draft-04.
Метасхема находится в /resources, при загрузке проверяется ее hash.
Метасхема дополнена следующими ограничениями:
Для любого объекта схемы обязательно хотя бы одно поле. (ограничивает использование пустых объектов {} в схеме).
Временно ограничено использование ссылок $ref.
Для массивов (
"type": "array") обязательны поля: maxItems, additionalItems=false, uniqueItems=true.Для объектов (
"type": "object") обязательно поле additionalProperties. Поле может принимать значение false, либо {"type":"string"} с указанием maxProperties.Для чисел (
"type": "number"или"type": "integer") обязательны поля minimum и maximum.Для строк (
"type": "string") обязательно поле maxLength.При указании нескольких типов для поля (
"type": ["object", "array"]) проверяются ограничения для всех присутствующих типов.
При неуспешной валидации будет выброшено исключение "Schema '<Имя схемы>' doesnt match meta schema:" + список ошибок валидации.
Также метасхема дополнена следующими некритичными ограничениями, не влияющими на прохождение валидации:
Для любого объекта, имеющего тип (
"type"), требуется аннотация"description".Максимальная длина строк (
"type": "string") не должна быть больше 250 символов ("maxLength" <= 250)
Некритичные ограничения не влияют на прохождение схемой валидации, ошибки будут залогированы в WARN.
XSD#
XSD схема будет проверена на соответствие следующим ограничениям:
Для представления числовой информации необходимо использовать ограничение
“totalDigits”(xs:decimalи все типы, производные от него:xs:integer,xs:negativeInteger,xs:nonNegativeInteger,xs:nonPositiveInteger,xs:positiveInteger``xs:byte,xs:long,xs:int,xs:short,xs:unsignedLong,xs:unsignedInt,xs:unsignedShort,xs:unsignedByte).Запрещено использовать
"any"для описания элементов (xs:any).Не допускается неограниченная длина элементов схемы (
<xs:element maxOccurs="unbounded"/>).
При неуспешной валидации будет выброшено исключение: "XML Schema '$schema' validation failed: *список ошибок валдиации*".
Также будут проверены следующие некритичные ограничения, не влияющие на прохождение валидации:
Все элементы схем должны быть аннотированы.
Максимальная длина строк не должна быть больше 250 символов.
Некритичные ограничения не влияют на прохождение схемой валидации, ошибки будут залогированы в WARN.
Avro#
Валидация avro-схем не поддерживается.
JMX метрики#
При успешной загрузке перехватчика в jmx будут добавлены метрики с именем/типом схемы для каждого topic и флагом прохождения схемой валидации по пути в аттрибуте Value:
kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaName
kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaType
kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=validated
или
kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaName
kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaType
kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=validated
Для схемы по умолчанию в имени метрики будет отсутствовать имя topic topic=<topic-name>.
Значения флага validated:
true- валидация схем включена и успешно пройдена;false- валидация схем выключена.
client-id берется из конфигурации consumer client.id, при отсутствии в конфигурации генерируется автоматически.