Validator-interceptor#

Описание модуля#

Реализует интерфейсы ConsumerInterceptor/ProducerInterceptor и позволяет валидировать сообщения в форматах json/xml/avro по схемам, указанным в конфигурационном файле.

  • Перехватчики позволяют использовать разные схемы для разных topics.

  • Для форматов json и xml поддерживается валидация схем.

Валидаторы могут работать со следующими типами сообщений:

  • json:

    • String;

    • Array[Byte];

    • com.fasterxml.jackson.databind.JsonNode.

  • xml

    • String;

    • Array[Byte];

    • javax.xml.transform.Source;

    • org.w3c.dom.Document.

  • avro

    • Array[Byte].

Подключение к Kafka клиентам#

  1. Добавить актуальную версию перехватчика в зависимости проекта.

  2. Создать конфигурационный файл с настройками валидаторов и схем в соответствии с примером. Конфигурационный файл с настройками валидаторов и схем создается самостоятельно администратором.

  3. Добавить настройки перехватчика к настройкам kafka клиента в соответствии с примерами.

Примеры конфигурации#

Пример конфигурации файла со списком путей до схем#

Для каждого topic можно задать отдельную схему, а также схему по умолчанию с именем topic "*".

Имя topic, который будет использоваться как topic по умолчанию, можно задать с помощью настройки interceptor.validator.default.topic.

Если схема по умолчанию отсутствует и сообщение было отправлено/получено из topic, для которого не указана схема - сообщение считается НЕВАЛИДНЫМ.

Если валидация для какого-либо topic не требуется - можно настроить валидатор без схемы с типом noop.

schemas: {
    # Валидатор для topic "topic-1"
    "topic-1": {
        # Тип схемы/сообщений
        type: "json",
        # Путь до схемы на файловой системе
        schema: "path/to/schema.json"
        # ОПЦИОНАЛЬНО Кодировка схемы
        schemaEncoding: "UTF-8"
    }

    # Валидатор для topic  "topic-2"
    "topic-2": {
        # Тип схемы/сообщений
        type: "xml"
        # Путь до схемы на файловой системе
        schema: "file://path/to/schema.xml"
        # ОПЦИОНАЛЬНО Кодировка схемы
        schemaEncoding: "UTF-8"
    }

    # Валидатор для topic "topic-3"
    "topic-3": {
        # Тип схемы/сообщений
        type: "avro"
        # Путь до схемы в classpath
        schema: "classpath://path/to/schema.avsc"
        # ОПЦИОНАЛЬНО Кодировка схемы
        schemaEncoding: "UTF-8"
    }
    
    # Валидатор для всех остальных topics
    "*": {
        # Валидатор с типом noop считает любое сообщение валидным
        type: "noop"
    }
}

Пример конфигурации Kafka Producer#

...
# 1) Подключить перехватчик
interceptor.classes=ru.sbt.ss.kafka.validator.interceptor.ValidatorProducerInterceptor

# 2) Указать путь до конфигурационного файла с настройками валидаторов

# Загрузка из файловой системы
interceptor.validator.config=path/to/validator.conf 
# Загрузка из classpath
# interceptor.validator.config=classpath://path/to/validator.conf 

# 3) ОПЦИОНАЛЬНО Включить валидацию схем при запуске перехватчика
# interceptor.validator.schema.validation.enabled=true

# 4) ОПЦИОНАЛЬНО Настройить режим работы producer при ошибках валидации (по умолчанию failOnValue)
# interceptor.validator.mode = failOnValue

# 5) ОПЦИОНАЛЬНО Настроить провайдер для загрузки схем
# interceptor.validator.schema.provider.class =

Пример конфигурации Kafka Consumer#

...
# 1) Подключить перехватчик
interceptor.classes=ru.sbt.ss.kafka.validator.interceptor.ValidatorConsumerInterceptor

# 2) Указать путь до конфигурационного файла с настройками валидаторов
# Загрузка из файловой системы
interceptor.validator.config=path/to/validator.conf 
# Загрузка из classpath
# interceptor.validator.config=classpath://path/to/validator.conf 

# 3) ОПЦИОНАЛЬНО Включить валидацию схем при запуске перехватчика
# interceptor.validator.schema.validation.enabled=true

# 4) ОПЦИОНАЛЬНО Настройить режим работы consumer при ошибках валидации (по умолчанию failOnValue)
# interceptor.validator.mode = failOnValue

# 5) ОПЦИОНАЛЬНО Настроить провайдер для загрузки схем
# interceptor.validator.schema.provider.class =

Поведение при ошибках#

Поведение при ошибках валидации настраивается с помощью параметра interceptor.validator.mode.

В случае если сообщение не прошло валидацию при отправке (producer.send()) возможны следующие режимы:

  1. failOnValue (используется по умолчанию) - клиент получит сообщение-заглушку вместо невалидного сообщения, которое содержит null вместо value и выбросит исключение ru.sbt.ss.kafka.interceptors.ProducerInterceptorException при вызове record.value() (перед сериализацией).

Данный способ позволяет использовать логику обработки ошибок kafka-клиента, в том числе вызывать методы перехватчиков (но не callback).

  1. failOnSend - будет выброшено исключение ru.sbt.ss.kafka.interceptors.ProducerInterceptorError (extends Throwable) c сообщением, игнорирую логику обработки ошибок kafka-клиента.

Сообщения исключений имеют формат Error while processing record(topic: topic): *ошибка валидации в зависимости от формата сообщения*.

В случае, если сообщение не прошло валидацию при получении (consumer.poll()) поведение настраивается параметром interceptor.validator.mode:

  1. failOnValue (используется по умолчанию) — клиент получит сообщение-заглушку вместо невалидного сообщения, которое содержит null вместо value и выбросит исключение ru.sbt.ss.kafka.interceptors.ConsumerInterceptorException при вызове record.value().

  2. filter — сообщение об ошибке будет залогировано в error, клиент не получит невалидное сообщение.

  3. failOnConsume — метод consumer.poll() выбросит исключение ru.sbt.ss.kafka.interceptors.ConsumerInterceptorError, клиент не получит ни одного сообщения из пачки.

Загрузка конфигурации/схем из classpath#

Для загрузки конфигурации/схем из classpath небходимо указать протокол classpath:// в пути до файла, например:

  • для файла конфигурации: interceptor.validator.config=classpath://path/to/validator.conf ;

  • для файла со схемой: schema: "classpath://path/to/schema.xsd".

Если xsd схема была загружена из classpath — дополнительные схемы, указанные с помощью <include schemaLocation="additional.xsd"> тоже будут загружены из classpath, указывать префикс внутри схемы не обязательно.

Относительные пути в конфигурации#

Относительные пути до файлов в конфигурации обычно разрешаются относительно директории запуска приложения.

В некоторых случаях (для автоматических установок, например для установок с помощью скриптов) может быть полезно явно указать root директорию, относительно которой будут разрешаться относительные пути:

  • interceptor.config.root.dir (общая для всех перехватчиков с подобной настройкой);

  • interceptor.validator.config.root.dir (переопределяет общую настройку).

Данная настройка не применяется к путям в classpath (classpath://path/to/file.txt).

Пример конфигурации:

interceptor.validator.config.root.dir=/full/path/to
# interceptor.config.root.dir=/full/path/to

# /full/path/to/validator.conf
interceptor.validator.config=validator.conf

Также работает и для путей до схем в файле конфигурации валидатора:

schemas: {
 "topic-1": {
    type: "json",
    
    # /full/path/to/schema.conf
    schema: "schema.json" 
 }
 
 "topic-2": {
    type: "xml"
    
    # /full/path/to/schema.xml
    schema: "schema.xml" 
 }

Интерфейс провайдера для загрузки схем#

Перехватчик предоставляет интерфейс ru.sbt.ss.kafka.validator.ValidatorInterceptorSchemasProvider, позволяющий использовать свою реализацию провайдера для загрузки схем валидации.

Загрузка происходит следующим образом:

  1. Перехватчик получает имя класса провайдера из конфигурации kafka-клиента с помощью параметра intercerptor.validator.schema.provider.class.

  2. Перехватчик создает новый инстанс провайдера c помощью конструктора по умолчанию без параметров.

  3. Перехватчик вызывает метод void configure(Map<String, String> configs) c конфигурацией, переданной kafka-клиенту.

  4. Интересптор вызывает метод Map<String, SchemaProvider> getSchemas() и создает валидатор для каждого topic, присутствующего в Map<String, SchemaProvider> (ключ = имя topic).

package ru.sbt.ss.kafka.validator;

import ru.sbt.ss.validator.schema.SchemaProvider;

import java.util.Map;

/**
 * Schemas provider for validator-interceptor.
 *
 * Implementations should have default constructor without arguments.
 *
 * Interceptor will create and configure instance of this interface,
 * then call getSchemas() method and initialize validators for each entry (topic) of the schemas map.
 * Schemas map should include default topic as well.
 */
public interface ValidatorInterceptorSchemasProvider {

	/**
	 * This method is called once after interceptor creation
	 *
	 * @return map of topic -> {@link SchemaProvider}
	 */
	Map<String, SchemaProvider> getSchemas();

	/**
	 * Optional configuration method, will be called after provider creation.
	 *
	 * @param configs kafka configuration
	 */
	default void configure(Map<String, String> configs) {
		// NOOP
	}

}
/**
 * Validation schema provider for {@link ru.sbt.ss.validator.Validator}
 */
public interface SchemaProvider {

	/**
	 * Schema name, used in logging/metrics
	 */
	String getName();

	/**
	 * Schema as string
	 */
	String getSchema();

	/**
	 * Schema type
	 */
	SchemaType getType();
}

Пример реализации интерфейса провайдера схем#

package ru.sbt.ss.kafka.validator;

import ru.sbt.ss.kafka.validator.config.ValidatorInterceptorConfig;
import ru.sbt.ss.validator.schema.SchemaProvider;
import ru.sbt.ss.validator.schema.SchemaType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class CustomInterceptorSchemasProvider implements ValidatorInterceptorSchemasProvider {

	public static final String NOOP_VALIDATOR_ENABLED = ValidatorInterceptorConfig.ValidatorInterceptorPrefix() + "noop.validator.enabled";

	private static final SchemaProvider AVRO_SCHEMA_PROVIDER = new ClasspathSchemaProvider("avro/correct_avro_schema.avsc", SchemaType.AVRO);
	private static final SchemaProvider JSON_SCHEMA_PROVIDER = new ClasspathSchemaProvider("json/schema_first.json", SchemaType.JSON);
	private static final SchemaProvider XML_SCHEMA_PROVIDER = new ClasspathSchemaProvider("xml/schema/xml_schema.xsd", SchemaType.XML);
	private static final SchemaProvider NOOP_SCHEMA_PROVIDER = new SchemaProvider() {
		@Override
		public String getName() {
			return "noop";
		}

		@Override
		public String getSchema() {
			return null;
		}

		@Override
		public SchemaType getType() {
			return SchemaType.NOOP;
		}
	};

	private final Map<String, SchemaProvider> schemas = new HashMap<>();

	@Override
	public void configure(Map<String, String> configs) {
		schemas.put("avro-topic", AVRO_SCHEMA_PROVIDER);
		schemas.put("json-topic", JSON_SCHEMA_PROVIDER);
		schemas.put("xml-topic", XML_SCHEMA_PROVIDER);

		final boolean noopValidatorEnabled = Boolean.parseBoolean(configs.getOrDefault(NOOP_VALIDATOR_ENABLED, "true"));
		if (noopValidatorEnabled) schemas.put(ValidatorInterceptorConfig.DefaultTopicDefault(), NOOP_SCHEMA_PROVIDER);
	}

	@Override
	public Map<String, SchemaProvider> getSchemas() {
		return schemas;
	}


	public static class ClasspathSchemaProvider implements SchemaProvider {

		private final String name;

		private String schema;

		private final SchemaType type;

		public ClasspathSchemaProvider(String schemaPath, SchemaType type) {
			this.name = schemaPath;
			try {
				this.schema = new String(getClass().getClassLoader().getResourceAsStream(schemaPath).readAllBytes(), StandardCharsets.UTF_8);
			} catch (IOException e) {
				this.schema = "";
				e.printStackTrace();
			}
			this.type = type;
		}

		@Override
		public String getName() {
			return name;
		}

		@Override
		public String getSchema() {
			return schema;
		}

		@Override
		public SchemaType getType() {
			return type;
		}
	}
}

Валидация схем#

При настройке interceptor.validator.schema.validation.enabled=true json/xsd схемы будут провалидированы при загрузке.

Json#

Механизм валидации аналогичен валидации обычных сообщений, в качестве схемы используется json метасхема версии draft-04. Метасхема находится в /resources, при загрузке проверяется ее hash.

Метасхема дополнена следующими ограничениями:

  1. Для любого объекта схемы обязательно хотя бы одно поле. (ограничивает использование пустых объектов {} в схеме).

  2. Временно ограничено использование ссылок $ref.

  3. Для массивов ("type": "array") обязательны поля: maxItems, additionalItems=false, uniqueItems=true.

  4. Для объектов ("type": "object") обязательно поле additionalProperties. Поле может принимать значение false, либо {"type":"string"} с указанием maxProperties.

  5. Для чисел ("type": "number" или "type": "integer") обязательны поля minimum и maximum.

  6. Для строк ("type": "string") обязательно поле maxLength.

  7. При указании нескольких типов для поля ("type": ["object", "array"]) проверяются ограничения для всех присутствующих типов.

При неуспешной валидации будет выброшено исключение "Schema '<Имя схемы>' doesnt match meta schema:" + список ошибок валидации.

Также метасхема дополнена следующими некритичными ограничениями, не влияющими на прохождение валидации:

  1. Для любого объекта, имеющего тип ("type"), требуется аннотация "description".

  2. Максимальная длина строк ("type": "string") не должна быть больше 250 символов ("maxLength" <= 250)

Некритичные ограничения не влияют на прохождение схемой валидации, ошибки будут залогированы в WARN.

XSD#

XSD схема будет проверена на соответствие следующим ограничениям:

  1. Для представления числовой информации необходимо использовать ограничение “totalDigits” (xs:decimal и все типы, производные от него: xs:integer, xs:negativeInteger, xs:nonNegativeInteger,xs:nonPositiveInteger, xs:positiveInteger``xs:byte, xs:long, xs:int, xs:short, xs:unsignedLong, xs:unsignedInt, xs:unsignedShort, xs:unsignedByte).

  2. Запрещено использовать "any" для описания элементов (xs:any).

  3. Не допускается неограниченная длина элементов схемы (<xs:element maxOccurs="unbounded"/>).

При неуспешной валидации будет выброшено исключение: "XML Schema '$schema' validation failed: *список ошибок валдиации*".

Также будут проверены следующие некритичные ограничения, не влияющие на прохождение валидации:

  1. Все элементы схем должны быть аннотированы.

  2. Максимальная длина строк не должна быть больше 250 символов.

Некритичные ограничения не влияют на прохождение схемой валидации, ошибки будут залогированы в WARN.

Avro#

Валидация avro-схем не поддерживается.

JMX метрики#

При успешной загрузке перехватчика в jmx будут добавлены метрики с именем/типом схемы для каждого topic и флагом прохождения схемой валидации по пути в аттрибуте Value:

kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaName
kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaType
kafka.consumer:type=consumer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=validated

или

kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaName
kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=schemaType
kafka.producer:type=producer-interceptor-metrics,client-id=<client-id>,interceptor=ValidatorInterceptor,topic=<topic-name>,name=validated

Для схемы по умолчанию в имени метрики будет отсутствовать имя topic topic=<topic-name>.

Значения флага validated:

  • true - валидация схем включена и успешно пройдена;

  • false - валидация схем выключена.

client-id берется из конфигурации consumer client.id, при отсутствии в конфигурации генерируется автоматически.