Kafka-ott-signature-serde#

Описание модуля#

Реализует интерфейсы Serializer/Deserializer, предназначен для получения OTT токенов (One Time Token), а также для подписи/проверки подписи сообщений с помощью OTT.

Использует библиотеку sbp.com.sbt.ott:ott-jsonrpc-hb-client:4.3.1-157.

Логика работы:

  1. Producer-signature-plugin (serializer) запрашивает токен у централизованного сервиса OTT.

  2. Producer-signature-plugin (serializer) сериализует сообщение сериализатором, указанным с помощью настройки *.serializer.delegate.

  3. Producer-signature-plugin (serializer) подписывает тело сообщения (record.value()) с помощью токена.

  4. Producer-signature-plugin (serializer) добавляет подпись (в Base64) и токен (в формате jks) в заголовки сообщения.

  5. Consumer-signature-plugin (deserializer) получает токен (jws) из заголовка сообщения и проверяет его валидность.

  6. Consumer-signature-plugin (deserializer) получает подпись из заголовка сообщения и проверяет его валидность с помощью токена.

  7. Consumer-signature-plugin (deserializer) десериализует сообщение десериализатором, указанным с помощью настройки *.deserializer.delegate.

  8. В случае, если сообщение не прошло проверку подписи при получении (consumer.poll()) поведение настраивается параметром *.deserializer.signature.mode:

  • nullValueWithHeader (используется по умолчанию) – клиент получит сообщение, которое содержит null вместо value и дополнительный заголовок validation.error c сообщением об ошибке;

  • failOnConsume – метод consumer.poll() выбросит исключение, клиент не получит ни одного сообщения из пачки.

Подключение к Kafka-клиентам#

  1. Добавить актуальную версию перехватчика в зависимости проекта.

  2. Сконфигурировать kafka-клиенты в соответствии с примерами.

Подключение к Spring Kafka#

Для настройки клиента Kafka в Spring необходимо выполнить следующие шаги:

1. Настройка KafkaTemplate и ProducerFactory#

  @Configuration
  public static class JsonConfigConsumer {

    @Bean
    public ProducerFactory<String, String> producerFactorySettingsFromFile() {
    
      /** 
        Указываем путь до параметров `PATH_TO_PRODUCER_VALID_PROPERTIES`, дополнительные параметры можно передать через `Map.of("nameParam, valueParam")`
      */
      final var producerProperties = loadProperties(PATH_TO_PRODUCER_VALID_PROPERTIES, Map.of());

      return new DefaultKafkaProducerFactory<>(propertiesToMapConverter(producerProperties));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplateSettingsFromFile(ProducerFactory<String, String> producerFactorySettingsFromFile) {
      return new KafkaTemplate<>(producerFactorySettingsFromFile);
    }
  
    private Properties loadProperties(String path, Map<String, String> additionalSettings) {
      try {
        return FileUtils.loadPropertiesFromClasspath(path, additionalSettings);
      } catch (IOException e) {
      throw new RuntimeException(e);
      }
    }
  }

2. Настройка Listeners и ConsumerFactory#

  @Configuration
  public static class JsonConfigConsumer {

        @Bean
        public ConsumerFactory<String, String> consumerFactorySettingsFromFile() {
            /** 
              Указываем путь до параметров `PATH_TO_CONSUMER_VALID_PROPERTIES`, дополнительные параметры можно передать через `Map.of("nameParam, valueParam")`
            */
            Properties consumerProperties = loadProperties(PATH_TO_CONSUMER_VALID_PROPERTIES, Map.of());
            return new DefaultKafkaConsumerFactory<>(propertiesToMapConverter(consumerProperties));
        }

        @Bean
        public KafkaListenerContainerFactory<
                ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactorySettingsFromFile(
                ConsumerFactory<String, String> consumerFactorySettingsFromFile,
                ErrorHandler createErrorHandler
        ) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactorySettingsFromFile);

            return factory;
        }
  }

3. Настройка Listener#

  @EnableKafka
  @Component
  public class ListenersKafkaConsumerJson {
  
    @KafkaListener(
            id = "id-1",
            topics = "NAME_TOPIC",
            groupId = "group-id",
            
            /** 
              Необходимо указать имя bean фабрики с нужными параметрами.
            */
            containerFactory = "kafkaListenerContainerFactorySettingsFromFile",
    )
    private void listenerConfigurationFile(String data) {
      try {
        JsonConsumerSpringTest.RESPONSE_QUEUE.put(data);
      } catch (InterruptedException e) {
            throw new RuntimeException(e);
      }
    }
  } 

4. Настройка ErrorHandler (опционально)#

Чтобы задать логику обработки ошибок, необходимо при создании KafkaListenerContainerFactory задать собственную реализацию ErrorHandler, затем передать в метод setErrorHandler(errorHandler).

Пример реализации ErrorHandler:

new ErrorHandler() {
  @Override
  public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
     throw e;
  }
}

Конфигурация ОТТ-запроса#

Для OttSignatureSerializer атрибуты ott.request.* отправляются на сервер ОТТ для авторизации запроса на создание токена. Некоторые параметры попадают в токен в виде token claims.

Для OttSignatureDeserializer атрибуты ott.request.* используются для проверки токена в заголовке сообщения. Все атрибуты, указанные в конфигурации считаются обязательными, если они отсутствуют в токене - токен не пройдет валидацию и подпись будет считать невалидной.

Настройки ОТТ-запроса ott.request.* транслируются в вызовы методов билдера com.sbt.ott.api.client.TokenRequestBuilder ОТТ-клиента.

Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.

Строковые параметры#

Параметр конфигурации

Метод билдера

ott.request.invoker = testInvoker

builder.invoker("testInvoker")

ott.request.receiver = testReceiver

builder.invoker("testReceiver")

ott.request.url = /test/url

builder.url("/test/url")

ott.request.domain = testDomain

builder.domain("testDomain")

ott.request.action = testAction

builder.action("testAction")

Фиксированные составные параметры (для подраздела Создание атрибутов)#

Могут состоять из опциональной категории, имени параметра и его значения, разделитель по умолчанию #.

!ВАЖНО Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.

Параметр

Метод

ott.request.add.subject = testSubjectAttributeName#testSubjectAttributeValue

builder.addSubject(BasicAttribute("testSubjectAttributeName", "testSubjectAttributeValue"))

ott.request.add.recipient.subject = testRecipientAttributeName#testRecipientAttributeValue

builder.addRecipientSubject(BasicAttribute("testRecipientAttributeName", "testRecipientAttributeValue"))

ott.request.add.action = testActionAttributeName#testActrionAttributeValue

builder.addAction(BasicAttribute("testActionAttributeName", "testActionAttributeValue"))

ott.request.add.resource = testResourceAttributeName#testResourceAttributeValue

builder.addResource(BasicAttribute("testResourceAttributeName", "testResourceAttributeValue"))

Прочие составные параметры (для подраздела Создание атрибутов)#

Задаются аналогично фиксированным составным параметрам, можно задать несколько параметров из каждой категории ott.request.add.environment или ott.request.add.attribute.

!ВАЖНО Атрибуты ott.request.add.* в конфигурации consumer должны быть указаны как у producer для корректной валидации параметров в подписи события.

Параметр

Метод

ott.request.add.environment.* = testEnvAttributeName#testEnvAttributeValue

builder.addEnvironment(BasicAttribute("testEnvAttributeName", "testEnvAttributeValue"))

ott.request.add.attribute.* = testAttributeName#testAttributeValue

builder.addAttribute(BasicAttribute("testAttributeName", "testAttributeValue"))

Имена параметров после ott.request.add.environment. и ott.request.add.attribute. игнорируются, но должны быть уникальны для корректной передачи всех параметров в конфигурацию.

Пример конфигурации прочих параметров:

ott.request.add.environment.env1 = testEnvAttributeName1#testEnvAttributeValue1
ott.request.add.environment.env2 = testEnvAttributeName2#testEnvAttributeValue2
ott.request.add.attribute.1 = testAttributeName1#testAttributeValue1
ott.request.add.attribute.2 = testAttributeName2#testAttributeValue2

Создание атрибутов#

// Для составного параметра со значением ott.request.add.subject = subjectCategory#subjectId#subjectValue
var attribute = new BasicAttribute();

attribute.setCategory("subjectCategory");
attribute.setId("subjectId");
attribute.addValue("subjectValue");
attribute.setMandatory(true);

Добавление имени topic в параметр ОТТ-запроса#

Для добавления имени topic в параметр запроса можно использовать ${topic}:

ott.request.add.resource = testResourceAttributeName#${topic}

Примеры конфигурации#

Настройки Kafka Producer и Kafka Consumer конфигурируются там же, где и настройки Kafka-клиента.

Пример конфигурации Kafka Producer#

# 1) Подключить сериализатор
value.serializer = ru.sbt.ss.kafka.serialization.OttSignatureSerializer
value.serializer.delegate = org.apache.kafka.common.serialization.StringSerializer

# 2) Настроить ОТТ-клиент

# Id модуля (также alias клиентского сертификата)
value.serializer.signature.ott.module.id = ott-test-moduleA

# Хосты ОТТ
value.serializer.signature.ott.service.url = https://ott-service.otts-std-ift2.ingress.apps.ocp.ift-02.solution.sbt
value.serializer.signature.ott.service.hosts = { IP_ADDRESS }

# Realm авторизации
value.serializer.signature.ott.authz.realm = mmt

# ОПЦИОНАЛЬНО закрывать ОТТ-клиент при закрытии kafka-клиента
# Библиотека ott кеширует клиенты по module.id, но не удаляет их из кэша при закрытии, 
# из-за чего новый (или перезапустившийся) kafka-клиент может получить из кеша уже закрытый ott-клиент
# По умолчанию ott-клиент не закрывается
# value.serializer.signature.ott.close.client = false

# 3) Настроить параметры запроса к ОТТ-сервису
# Атрибуты `*.ott.request.*` отправляются на сервер отт для авторизации запроса на создание токена. Некоторые параметры попадают в токен в виде token claims.

# ОПЦИОНАЛЬНО настроить разделитель строки составного параметра
# value.serializer.signature.ott.attribute.delim = #

# 3.1) Пример настройки для realm mmt (целевой на момент написания документации):
value.serializer.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
value.serializer.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
value.serializer.signature.ott.request.add.environment.realm = ott:realm#mmt

# 3.2) Пример настройки для realm synapse (legacy):
# В версиях < 0.3.0 присутствовали специфичные параметры для realm synapse, из которых строился атрибут ott:synapse:uri
# 1: value.serializer.signature.ott.synapse.uri.prefix = /cluster/
# 2: value.serializer.signature.ott.synapse.uri.publish.suffix = /publish
# В новых версиях эти параметры не используются, значение атрибута задается явно:
# value.serializer.signature.ott.request.add.resource = ott:synapse:uri#/cluster/${topic}/publish

# value.serializer.signature.ott.request.add.subject = ott:app#moduleId
# value.serializer.signature.ott.request.add.environment.realm = ott:realm#synapse

# 4) Настроить TLS для ОТТ-клиента

# 4.1) Пример настройки с сертификатами в jks-хранилище

# alias серверного сертификата ОТТ в truststore
value.serializer.signature.ott.service.cert.alias = ott-service

# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.serializer.signature.ott.certstore.type = JKS

# Путь до хранилища клиентских сертификатов
value.serializer.signature.ott.certstore.path = ssl/keystore.jks

# Путь до хранилища доверенных сертификатов
value.serializer.signature.ott.trust.store.path = ssl/truststore.jks

# Пароль от хранилища клиентских сертификатов
value.serializer.signature.ott.certstore.pwd = password

# Пароль от приватного ключа
value.serializer.signature.ott.certstore.private.key.pwd = password

# Пароль от хранилища доверенных сертификатов
value.serializer.signature.ott.trust.store.pwd = password

# Версия TLS протокола
value.serializer.signature.ott.client.tls = TLSv1.2

# 4.2) Пример настройки с сертификатами в формате pem

# alias серверного сертификата отт в truststore
value.serializer.signature.ott.service.cert.alias = ott-service

# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.serializer.signature.ott.certstore.type = PEM

# Сертификат сервиса ОТТ
value.serializer.signature.ott.service.crt  = ott-service.pem

# Сертификат УЦ, подписавшего сертификат сервиса ОТТ
value.serializer.signature.ott.service.tls.crt = ott-service-tls.pem

# Сертификат клиента
value.serializer.signature.ott.client.crt = ott-client.pem

# Приватный ключ клиента
value.serializer.signature.ott.client.private.key = ott-ckient-private-key.key

# Версия TLS протокола
value.serializer.signature.ott.client.tls = TLSv1.2

# 5) ОПЦИОНАЛЬНО Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
# value.serializer.signature.header = signature

# Префикс для имен системных заголовков
# value.serializer.signature.attributes.prefix = signature.

# Имя заголовка с токеном
# value.serializer.signature.ott.token.header = token

Пример конфигурации Kafka Consumer#

# 1) Подключить десериализатор
value.deserializer = ru.sbt.ss.kafka.serialization.OttSignatureDeserializer
value.deserializer.delegate = org.apache.kafka.common.serialization.StringSerializer

# 2) Настроить ОТТ-клиент

# Id модуля (также alias клиентского сертификата)
value.deserializer.signature.ott.module.id = ott-test-moduleA

# Хосты ОТТ
value.deserializer.signature.ott.service.url = https://ott-service.otts-std-ift2.ingress.apps.ocp.ift-02.solution.sbt
value.deserializer.signature.ott.service.hosts = { IP_ADDRESS }

# Realm авторизации
value.deserializer.signature.ott.authz.realm = mmt

# ОПЦИОНАЛЬНО закрывать ОТТ-клиент при закрытии kafka-клиента
# Библиотека ott кеширует клиенты по module.id, но не удаляет их из кэша при закрытии, 
# из-за чего новый (или перезапустившийся) kafka-клиент может получить из кеша уже закрытый ott-клиент
# По умолчанию ott-клиент не закрывается
# value.deserializer.signature.ott.close.client = false

# 3) Настроить параметры запроса к ОТТ-сервису
# Атрибуты `ott.request.*` используются для проверки токена в заголовке сообщения 
# Атрибуты `ott.request.add.*` должны быть указаны как у producer для корректной валидации параметров в подписи события
# Все атрибуты, указанные в конфигурации считаются обязательными, если они отсутствуют в токене - токен не пройдет валидацию и подпись будет считать невалидной.

# ОПЦИОНАЛЬНО настроить разделитель строки составного параметра
# value.deserializer.signature.ott.attribute.delim = #

# 3.1) Пример настройки для realm mmt (целевой на момент написания документации):
value.deserializer.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
value.deserializer.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
value.deserializer.signature.ott.request.add.environment.realm = ott:realm#mmt

# 3.2) Пример настройки для realm synapse (legacy):
# В версиях < 0.3.0 присутствовали специфичные параметры для realm synapse, из которых строился атрибут ott:synapse:uri:
# 1: value.deserializer.signature.ott.synapse.uri.prefix = /cluster/
# 2: value.deserializer.signature.ott.synapse.uri.publish.suffix = /publish
# В новых версиях эти параметры не используются, значение атрибута задается явно:
# value.deserializer.signature.ott.request.add.resource = ott:synapse:uri#/cluster/${topic}/publish

# value.deserializer.signature.ott.request.add.subject = ott:app#moduleId
# value.deserializer.signature.ott.request.add.environment.realm = ott:realm#synapse

# 4) Настроить TLS для ОТТ-клиента

# 4.1) Пример настройки с сертификатами в jks-хранилище

# alias серверного сертификата отт в truststore
value.deserializer.signature.ott.service.cert.alias = ott-service

# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.deserializer.signature.ott.certstore.type = JKS

# Путь до хранилища клиентских сертификатов
value.deserializer.signature.ott.certstore.path = ssl/keystore.jks

# Путь до хранилища доверенных сертификатов
value.deserializer.signature.ott.trust.store.path = ssl/truststore.jks

# Пароль от хранилища клиентских сертификатов
value.deserializer.signature.ott.certstore.pwd = password

# Пароль от приватного ключа
value.deserializer.signature.ott.certstore.private.key.pwd = password

# Пароль от хранилища доверенных сертификатов
value.deserializer.signature.ott.trust.store.pwd = password

# Версия TLS протокола
value.deserializer.signature.ott.client.tls = TLSv1.2

# 4.2) Пример настройки с сертификатами в формате pem

# alias серверного сертификата отт в truststore
value.deserializer.signature.ott.service.cert.alias = ott-service

# Тип хранилища сертифкатов (по умолчанию pkcs12)
value.deserializer.signature.ott.certstore.type = PEM

# Сертификат сервиса ОТТ
value.deserializer.signature.ott.service.crt  = ott-service.pem

# Сертификат УЦ, подписавшего сертификат сервиса ОТТ
value.deserializer.signature.ott.service.tls.crt = ott-service-tls.pem

# Сертификат клиента
value.deserializer.signature.ott.client.crt = ott-client.pem

# Приватный ключ клиента
value.deserializer.signature.ott.client.private.key = ott-ckient-private-key.key

# Версия TLS протокола
value.deserializer.signature.ott.client.tls = TLSv1.2

# 5) ОПЦИОНАЛЬНО Настроить удаление системных заголовков из сообщения
# value.deserializer.signature.remove.headers = false

# 6) ОПЦИОНАЛЬНО Настроить режим работы консюмера при ошибках (по умолчанию failOnValue)
# value.deserializer.signature.mode = failOnValue

# 7) ОПЦИОНАЛЬНО Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
# value.deserializer.signature.header = signature

# Префикс для имен системных заголовков
# value.deserializer.signature.attributes.prefix = signature.

# Имя заголовка с токеном
# value.deserializer.signature.ott.token.header = token