Kafka-ott-signature-serde#
Описание модуля#
Реализует интерфейсы Serializer/Deserializer, предназначен для получения OTT токенов (One Time Token), а также для подписи/проверки подписи сообщений с помощью OTT.
Использует библиотеку sbp.com.sbt.ott:ott-jsonrpc-hb-client:4.3.1-157.

Логика работы:
Producer-signature-plugin (serializer) запрашивает токен у централизованного сервиса OTT.
Producer-signature-plugin (serializer) сериализует сообщение сериализатором, указанным с помощью настройки
*.serializer.delegate.Producer-signature-plugin (serializer) подписывает тело сообщения (record.value()) с помощью токена.
Producer-signature-plugin (serializer) добавляет подпись (в Base64) и токен (в формате jks) в заголовки сообщения.
Consumer-signature-plugin (deserializer) получает токен (jws) из заголовка сообщения и проверяет его валидность.
Consumer-signature-plugin (deserializer) получает подпись из заголовка сообщения и проверяет его валидность с помощью токена.
Consumer-signature-plugin (deserializer) десериализует сообщение десериализатором, указанным с помощью настройки
*.deserializer.delegate.В случае, если сообщение не прошло проверку подписи при получении (
consumer.poll()) поведение настраивается параметром*.deserializer.signature.mode:
nullValueWithHeader(используется по умолчанию) – клиент получит сообщение, которое содержит null вместо value и дополнительный заголовокvalidation.errorc сообщением об ошибке;failOnConsume– методconsumer.poll()выбросит исключение, клиент не получит ни одного сообщения из пачки.
Подключение к Kafka-клиентам#
Добавить актуальную версию перехватчика в зависимости проекта.
Сконфигурировать kafka-клиенты в соответствии с примерами.
Подключение к Spring Kafka#
Для настройки клиента Kafka в Spring необходимо выполнить следующие шаги:
1. Настройка KafkaTemplate и ProducerFactory#
@Configuration
public static class JsonConfigConsumer {
@Bean
public ProducerFactory<String, String> producerFactorySettingsFromFile() {
/**
Указываем путь до параметров `PATH_TO_PRODUCER_VALID_PROPERTIES`, дополнительные параметры можно передать через `Map.of("nameParam, valueParam")`
*/
final var producerProperties = loadProperties(PATH_TO_PRODUCER_VALID_PROPERTIES, Map.of());
return new DefaultKafkaProducerFactory<>(propertiesToMapConverter(producerProperties));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateSettingsFromFile(ProducerFactory<String, String> producerFactorySettingsFromFile) {
return new KafkaTemplate<>(producerFactorySettingsFromFile);
}
private Properties loadProperties(String path, Map<String, String> additionalSettings) {
try {
return FileUtils.loadPropertiesFromClasspath(path, additionalSettings);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
2. Настройка Listeners и ConsumerFactory#
@Configuration
public static class JsonConfigConsumer {
@Bean
public ConsumerFactory<String, String> consumerFactorySettingsFromFile() {
/**
Указываем путь до параметров `PATH_TO_CONSUMER_VALID_PROPERTIES`, дополнительные параметры можно передать через `Map.of("nameParam, valueParam")`
*/
Properties consumerProperties = loadProperties(PATH_TO_CONSUMER_VALID_PROPERTIES, Map.of());
return new DefaultKafkaConsumerFactory<>(propertiesToMapConverter(consumerProperties));
}
@Bean
public KafkaListenerContainerFactory<
ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactorySettingsFromFile(
ConsumerFactory<String, String> consumerFactorySettingsFromFile,
ErrorHandler createErrorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactorySettingsFromFile);
return factory;
}
}
3. Настройка Listener#
@EnableKafka
@Component
public class ListenersKafkaConsumerJson {
@KafkaListener(
id = "id-1",
topics = "NAME_TOPIC",
groupId = "group-id",
/**
Необходимо указать имя bean фабрики с нужными параметрами.
*/
containerFactory = "kafkaListenerContainerFactorySettingsFromFile",
)
private void listenerConfigurationFile(String data) {
try {
JsonConsumerSpringTest.RESPONSE_QUEUE.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
4. Настройка ErrorHandler (опционально)#
Чтобы задать логику обработки ошибок, необходимо при создании KafkaListenerContainerFactory задать собственную реализацию ErrorHandler, затем передать в метод setErrorHandler(errorHandler).
Пример реализации ErrorHandler:
new ErrorHandler() {
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
throw e;
}
}
Конфигурация ОТТ-запроса#
Для OttSignatureSerializer атрибуты ott.request.* отправляются на сервер ОТТ для авторизации запроса на создание токена. Некоторые параметры попадают в токен в виде token claims.
Для OttSignatureDeserializer атрибуты ott.request.* используются для проверки токена в заголовке сообщения. Все атрибуты, указанные в конфигурации считаются обязательными, если они отсутствуют в токене - токен не пройдет валидацию и подпись будет считать невалидной.
Настройки ОТТ-запроса ott.request.* транслируются в вызовы методов билдера com.sbt.ott.api.client.TokenRequestBuilder ОТТ-клиента.
Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.
Строковые параметры#
Параметр конфигурации |
Метод билдера |
|---|---|
|
|
|
|
|
|
|
|
|
|
Фиксированные составные параметры (для подраздела Создание атрибутов)#
Могут состоять из опциональной категории, имени параметра и его значения, разделитель по умолчанию #.
!ВАЖНО Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.
Параметр |
Метод |
|---|---|
|
|
|
|
|
|
|
|
Прочие составные параметры (для подраздела Создание атрибутов)#
Задаются аналогично фиксированным составным параметрам, можно задать несколько параметров из каждой категории ott.request.add.environment или ott.request.add.attribute.
!ВАЖНО Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.
Параметр |
Метод |
|---|---|
|
|
|
|
Имена параметров после ott.request.add.environment. и ott.request.add.attribute. игнорируются, но должны быть уникальны для корректной передачи всех параметров в конфигурацию.
Пример конфигурации прочих параметров:
ott.request.add.environment.env1 = testEnvAttributeName1#testEnvAttributeValue1
ott.request.add.environment.env2 = testEnvAttributeName2#testEnvAttributeValue2
ott.request.add.attribute.1 = testAttributeName1#testAttributeValue1
ott.request.add.attribute.2 = testAttributeName2#testAttributeValue2
Создание атрибутов#
// Для составного параметра со значением ott.request.add.subject = subjectCategory#subjectId#subjectValue
var attribute = new BasicAttribute();
attribute.setCategory("subjectCategory");
attribute.setId("subjectId");
attribute.addValue("subjectValue");
attribute.setMandatory(true);
Добавление имени topic в параметр ОТТ-запроса#
Для добавления имени topic в параметр запроса можно использовать ${topic}:
ott.request.add.resource = testResourceAttributeName#${topic}
Примеры конфигурации#
Настройки Kafka Producer и Kafka Consumer конфигурируются там же, где и настройки Kafka-клиента.
Пример конфигурации Kafka Producer#
# 1) Подключить сериализатор
value.serializer = ru.sbt.ss.kafka.serialization.OttSignatureSerializer
value.serializer.delegate = org.apache.kafka.common.serialization.StringSerializer
# 2) Настроить ОТТ-клиент
# Id модуля (также alias клиентского сертификата)
value.serializer.signature.ott.module.id = ott-test-moduleA
# Хосты ОТТ
value.serializer.signature.ott.service.url = https://ott-service.otts-std-ift2.ingress.apps.ocp.ift-02.solution.sbt
value.serializer.signature.ott.service.hosts = { IP_ADDRESS }
# Realm авторизации
value.serializer.signature.ott.authz.realm = mmt
# ОПЦИОНАЛЬНО закрывать ОТТ-клиент при закрытии kafka-клиента
# Библиотека ott кеширует клиенты по module.id, но не удаляет их из кэша при закрытии,
# из-за чего новый (или перезапустившийся) kafka-клиент может получить из кеша уже закрытый ott-клиент
# По умолчанию ott-клиент не закрывается
# value.serializer.signature.ott.close.client = false
# 3) Настроить параметры запроса к ОТТ-сервису
# Атрибуты `*.ott.request.*` отправляются на сервер отт для авторизации запроса на создание токена. Некоторые параметры попадают в токен в виде token claims.
# ОПЦИОНАЛЬНО настроить разделитель строки составного параметра
# value.serializer.signature.ott.attribute.delim = #
# 3.1) Пример настройки для realm mmt (целевой на момент написания документации):
value.serializer.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
value.serializer.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
value.serializer.signature.ott.request.add.environment.realm = ott:realm#mmt
# 3.2) Пример настройки для realm synapse (legacy):
# В версиях < 0.3.0 присутствовали специфичные параметры для realm synapse, из которых строился атрибут ott:synapse:uri
# 1: value.serializer.signature.ott.synapse.uri.prefix = /cluster/
# 2: value.serializer.signature.ott.synapse.uri.publish.suffix = /publish
# В новых версиях эти параметры не используются, значение атрибута задается явно:
# value.serializer.signature.ott.request.add.resource = ott:synapse:uri#/cluster/${topic}/publish
# value.serializer.signature.ott.request.add.subject = ott:app#moduleId
# value.serializer.signature.ott.request.add.environment.realm = ott:realm#synapse
# 4) Настроить TLS для ОТТ-клиента
# 4.1) Пример настройки с сертификатами в jks-хранилище
# alias серверного сертификата ОТТ в truststore
value.serializer.signature.ott.service.cert.alias = ott-service
# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.serializer.signature.ott.certstore.type = JKS
# Путь до хранилища клиентских сертификатов
value.serializer.signature.ott.certstore.path = ssl/keystore.jks
# Путь до хранилища доверенных сертификатов
value.serializer.signature.ott.trust.store.path = ssl/truststore.jks
# Пароль от хранилища клиентских сертификатов
value.serializer.signature.ott.certstore.pwd = password
# Пароль от приватного ключа
value.serializer.signature.ott.certstore.private.key.pwd = password
# Пароль от хранилища доверенных сертификатов
value.serializer.signature.ott.trust.store.pwd = password
# Версия TLS протокола
value.serializer.signature.ott.client.tls = TLSv1.2
# 4.2) Пример настройки с сертификатами в формате pem
# alias серверного сертификата отт в truststore
value.serializer.signature.ott.service.cert.alias = ott-service
# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.serializer.signature.ott.certstore.type = PEM
# Сертификат сервиса ОТТ
value.serializer.signature.ott.service.crt = ott-service.pem
# Сертификат УЦ, подписавшего сертификат сервиса ОТТ
value.serializer.signature.ott.service.tls.crt = ott-service-tls.pem
# Сертификат клиента
value.serializer.signature.ott.client.crt = ott-client.pem
# Приватный ключ клиента
value.serializer.signature.ott.client.private.key = ott-ckient-private-key.key
# Версия TLS протокола
value.serializer.signature.ott.client.tls = TLSv1.2
# 5) ОПЦИОНАЛЬНО Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
# value.serializer.signature.header = signature
# Префикс для имен системных заголовков
# value.serializer.signature.attributes.prefix = signature.
# Имя заголовка с токеном
# value.serializer.signature.ott.token.header = token
Пример конфигурации Kafka Consumer#
# 1) Подключить десериализатор
value.deserializer = ru.sbt.ss.kafka.serialization.OttSignatureDeserializer
value.deserializer.delegate = org.apache.kafka.common.serialization.StringSerializer
# 2) Настроить ОТТ-клиент
# Id модуля (также alias клиентского сертификата)
value.deserializer.signature.ott.module.id = ott-test-moduleA
# Хосты ОТТ
value.deserializer.signature.ott.service.url = https://ott-service.otts-std-ift2.ingress.apps.ocp.ift-02.solution.sbt
value.deserializer.signature.ott.service.hosts = { IP_ADDRESS }
# Realm авторизации
value.deserializer.signature.ott.authz.realm = mmt
# ОПЦИОНАЛЬНО закрывать ОТТ-клиент при закрытии kafka-клиента
# Библиотека ott кеширует клиенты по module.id, но не удаляет их из кэша при закрытии,
# из-за чего новый (или перезапустившийся) kafka-клиент может получить из кеша уже закрытый ott-клиент
# По умолчанию ott-клиент не закрывается
# value.deserializer.signature.ott.close.client = false
# 3) Настроить параметры запроса к ОТТ-сервису
# Атрибуты `ott.request.*` используются для проверки токена в заголовке сообщения
# Атрибуты `ott.request.add.*` должны быть указаны как у producer для корректной валидации параметров в подписи события
# Все атрибуты, указанные в конфигурации считаются обязательными, если они отсутствуют в токене - токен не пройдет валидацию и подпись будет считать невалидной.
# ОПЦИОНАЛЬНО настроить разделитель строки составного параметра
# value.deserializer.signature.ott.attribute.delim = #
# 3.1) Пример настройки для realm mmt (целевой на момент написания документации):
value.deserializer.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
value.deserializer.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
value.deserializer.signature.ott.request.add.environment.realm = ott:realm#mmt
# 3.2) Пример настройки для realm synapse (legacy):
# В версиях < 0.3.0 присутствовали специфичные параметры для realm synapse, из которых строился атрибут ott:synapse:uri:
# 1: value.deserializer.signature.ott.synapse.uri.prefix = /cluster/
# 2: value.deserializer.signature.ott.synapse.uri.publish.suffix = /publish
# В новых версиях эти параметры не используются, значение атрибута задается явно:
# value.deserializer.signature.ott.request.add.resource = ott:synapse:uri#/cluster/${topic}/publish
# value.deserializer.signature.ott.request.add.subject = ott:app#moduleId
# value.deserializer.signature.ott.request.add.environment.realm = ott:realm#synapse
# 4) Настроить TLS для ОТТ-клиента
# 4.1) Пример настройки с сертификатами в jks-хранилище
# alias серверного сертификата отт в truststore
value.deserializer.signature.ott.service.cert.alias = ott-service
# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.deserializer.signature.ott.certstore.type = JKS
# Путь до хранилища клиентских сертификатов
value.deserializer.signature.ott.certstore.path = ssl/keystore.jks
# Путь до хранилища доверенных сертификатов
value.deserializer.signature.ott.trust.store.path = ssl/truststore.jks
# Пароль от хранилища клиентских сертификатов
value.deserializer.signature.ott.certstore.pwd = password
# Пароль от приватного ключа
value.deserializer.signature.ott.certstore.private.key.pwd = password
# Пароль от хранилища доверенных сертификатов
value.deserializer.signature.ott.trust.store.pwd = password
# Версия TLS протокола
value.deserializer.signature.ott.client.tls = TLSv1.2
# 4.2) Пример настройки с сертификатами в формате pem
# alias серверного сертификата отт в truststore
value.deserializer.signature.ott.service.cert.alias = ott-service
# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.deserializer.signature.ott.certstore.type = PEM
# Сертификат сервиса ОТТ
value.deserializer.signature.ott.service.crt = ott-service.pem
# Сертификат УЦ, подписавшего сертификат сервиса ОТТ
value.deserializer.signature.ott.service.tls.crt = ott-service-tls.pem
# Сертификат клиента
value.deserializer.signature.ott.client.crt = ott-client.pem
# Приватный ключ клиента
value.deserializer.signature.ott.client.private.key = ott-ckient-private-key.key
# Версия TLS протокола
value.deserializer.signature.ott.client.tls = TLSv1.2
# 5) ОПЦИОНАЛЬНО Настроить удаление системных заголовков из сообщения
# value.deserializer.signature.remove.headers = false
# 6) ОПЦИОНАЛЬНО Настроить режим работы консюмера при ошибках (по умолчанию failOnValue)
# value.deserializer.signature.mode = failOnValue
# 7) ОПЦИОНАЛЬНО Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
# value.deserializer.signature.header = signature
# Префикс для имен системных заголовков
# value.deserializer.signature.attributes.prefix = signature.
# Имя заголовка с токеном
# value.deserializer.signature.ott.token.header = token