Настройки конфигурационного файла adapter.conf#

Все конфигурационные настройки EVTA задаются в файле adapter.conf, который необходимо заполнить до запуска процесса установки EVTA. При заполнении конфигурационного файла необходимо ориентироваться на указанные комментарии.

В таблице представлены параметры настройки, задаваемые в файле adapter.conf. Пример заполненного файла приведен ниже:

Параметр

Описание

threadCount

Количество потоков обработки сообщений, по умолчанию равно количеству ядер в системе

timeout

Общий таймаут на блокирующие операции

batchSize

Размер сообщений, обрабатываемый за одну итерацию

retries

Количество попыток запуска EVTA

retryInterval

Интервал между попытками запуска EVTA (мс)

retryMultiplier

Множитель каждой следующей попытки

healthCheckHost

HTTP host для получения HealthCheck

healthCheckPort

HTTP порт для получения HealthCheck, если значение не передано или равно -1, то HTTP-сервер не поднимается

consumer

Параметры получателя

producer

Параметры отправителя

consumerType

Тип получателя, по умолчанию kafka

producerType

Тип отправителя, по умолчанию kafka

eventDiscoveryEnabled

Включение Event Discovery, по умолчанию true

ed

Параметры подключения к Event Discovery

ed: url

Адрес подключения к ETCD

ed: authority

CN сертификата сервера ETCD

ed: ssl.key.password

Пароль от приватного ключа

ed: ssl.keystore.location

Путь до хранилища приватного ключа

ed: ssl.keystore.password

Пароль от хранилища приватного ключа

ed: ssl.keystore.type

Тип хранилища, по умолчанию JKS

ed: ssl.truststore.location

Путь до хранилища доверенных сертификатов

ed: ssl.truststore.password

Пароль от хранилища доверенных сертификатов

ed: ssl.truststore.type

Тип хранилища, по умолчанию JKS

ed: ssl.protocol

Версия протокола TLS, по умолчанию TLSv1.2

ed: ssl.conscrypt

Признак использования библиотеки Conscrypt, по умолчанию true

processors

Настройки дополнительных обработчиков сообщений

egressEnabled

Тип используемого egress, поддерживаются 2 типа: REST и gRPC (none, если не используется)

egressSubscribe/egressPublish

Настройки egress для подписки и публикации соответственно

egressSubscribe/egressPublish: isHttp2

Значение по умолчанию false

egressSubscribe/egressPublish: host

Хост, на котором будет запущен сервер

egressSubscribe/egressPublish: port

Порт, на котором будет запущен сервер

egressSubscribe/egressPublish: secret

Ключ для расшифровки зашифрованных паролей

egressSubscribe/egressPublish: autoCommit

Значение по умолчанию false, отвечает за то, необходимо ли подтверждение получения сообщения со стороны клиента для его коммита

egressSubscribe/egressPublish: useStorageTime

Значение по умолчанию true, отвечает за то, необходима ли проверка максимального времени последнего подключения

egressSubscribe/egressPublish: delay

Значение по умолчанию 60000, периодичность проверки (мс) (используется, если установлен useStorageTime)

egressSubscribe/egressPublish: storageTime

Значение по умолчанию 30000, максимальное время последнего подключения (мс) (используется, если установлен useStorageTime)

egressSubscribe/egressPublish: needInfoAboutAcknowledge

Если флаг установлен, то ответы о публикации будут приходить только после подтверждения от транспорта, иначе сразу после отправки

egressSubscribe/egressPublish: ssl.white.list.path

Путь до файла со списком разрешенных для подключения к EVTA DN сертификатов (проверка происходит, если параметр указан)

egressSubscribe/egressPublish: charset

Кодировка сообщений для вычитки REST, по умолчанию UTF-8. В приоритете — значение из соответствующего заголовка вычитанного сообщения

egressSubscribe/egressPublish: ssl.key.password

Пароль от приватного ключа

egressSubscribe/egressPublish: ssl.keystore.location

Путь до хранилища приватного ключа

egressSubscribe/egressPublish: ssl.keystore.password

Пароль от хранилища приватного ключа

egressSubscribe/egressPublish: ssl.truststore.password

Пароль от хранилища доверенных сертификатов

egressSubscribe/egressPublish: ssl.protocol

Тип протокола SSL, который будет получен из vault. Значение по умолчанию TLSv1.2

egressSubscribe/egressPublish: ssl.cipher.suites

Перечисление используемых шифров через запятую, по умолчанию – все доступные

egressSubscribe/egressPublish: ssl.keystore.type

Тип хранилища, по умолчанию JKS

egressSubscribe/egressPublish: ssl.truststore.type

Тип хранилища, по умолчанию JKS

egressSubscribe/egressPublish: ssl.endpoint.identification.algorithm

Алгоритм идентификации endpoint подключения для проверки имени host сервера с использованием сертификата сервера. Значение по умолчанию https

egressSubscribe/egressPublish: ssl.conscrypt

Использование библиотеки concrypt. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.allowed.dn

Список разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.allowed.dn.file

Расположение файла со списком разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.tls.key.password

Пароль от приватного ключа хранилища

egressSubscribe/egressPublish: ssl.vault.tls.keystore.location

Путь до keystore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.keystore.password

Пароль от keystore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.truststore.location

Путь до truststore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.truststore.password

Пароль от truststore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.protocol

Для хранилища: тип протокола SSL, который будет получен из vault. Значение по умолчанию TLSv1.2

egressSubscribe/egressPublish: ssl.vault.tls.keystore.type

Формат файла keystore хранилища. Значение по умолчанию JKS

egressSubscribe/egressPublish: ssl.vault.tls.truststore.type

Формат файла truststore хранилища. Значение по умолчанию JKS

egressSubscribe/egressPublish: ssl.vault.tls.endpoint.identification.algorithm

Для хранилища: алгоритм идентификации endpoint подключения для проверки имени сервера с использованием сертификата сервера. Значение по умолчанию https

egressSubscribe/egressPublish: ssl.vault.tls.conscrypt

Для хранилища: использование библиотеки conscrypt. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn

Для хранилища: список разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn.file

Для хранилища: расположение файла со списком разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.address

HashiCorp Vault URL

egressSubscribe/egressPublish: ssl.vault.namespace

Пространство имен HashiCorp Vault

egressSubscribe/egressPublish: ssl.vault.auth.type

Тип аутентификации. Доступные значения:approle, token, password, cert. Значение по умолчанию approle

egressSubscribe/egressPublish: ssl.vault.auth.mount

Путь монтирования аутентификации

egressSubscribe/egressPublish: ssl.vault.auth.role.id

Role ID для типа аутентификации approle

egressSubscribe/egressPublish: ssl.vault.auth.secret.id

Secret ID для типа аутентификации approle

egressSubscribe/egressPublish: ssl.vault.auth.token

Токен для типа аутентификации token

egressSubscribe/egressPublish: ssl.vault.auth.username

Имя пользователя для аутентификации типа password

egressSubscribe/egressPublish: ssl.vault.auth.password

Пароль для аутентификации типа password

egressSubscribe/egressPublish: ssl.vault.tls.enable

Наличие защищенного соединения с HashiCorp Vault. Значение по умолчанию true

egressSubscribe/egressPublish: ssl.vault.namespace

Vault namespace

egressSubscribe/egressPublish: ssl.vault.pki.mount

Путь монтирования Vault PKI API. Значение по умолчанию pki

egressSubscribe/egressPublish: ssl.vault.pki.role.name

Имя роли в PKI Engine

egressSubscribe/egressPublish: ssl.vault.pki.common.name

Common name для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.crl.enable

Значение по умолчанию true, параметр для управления механизмом проверки сертификата по списку отозванных

egressSubscribe/egressPublish: ssl.vault.pki.email

Email для получения сертификата из HashiCorp Vault

egressSubscribe/egressPublish: ssl.vault.pki.alt.names

Alternative names для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.alt.ip

Alternative ip для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.ttl

TTL для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.csr

CSR для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.trust.only

Загружать только trust store. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.vault.secret.path

Секретный путь для ключа, keystore и truststore паролей

egressSubscribe/egressPublish: ssl.vault.secret.key

Secret key для пароля ключа

egressSubscribe/egressPublish: ssl.vault.secret.keystore

Secret key для keystore пароля

egressSubscribe/egressPublish: ssl.vault.secret.truststore

Secret key для truststore пароля

egressSubscribe/egressPublish: ssl.vault.before.expire

Время до даты экспирации для обновления сертификатов. Значение по умолчанию PT0S

egressSubscribe/egressPublish: ssl.vault.alias.ca

Псевдоним для CA сертификта в truststore

egressSubscribe/egressPublish: ssl.vault.alias.key

Псевдоним для ключа и сертификата в keystore

egressSubscribe/egressPublish: ssl.vault.engine.version

Версия API для key-value машины. Значение по умолчанию 2

egressSubscribe/egressPublish: ssl.vault.retries

Количество повторных попыток запроса. Значение по умолчанию 5

egressSubscribe/egressPublish: ssl.vault.retry.interval

Интервал между попытками (мс). Значение по умолчанию 500

egressSubscribe/egressPublish: ssl.vault.timeout

Таймаут запроса (с). Значение по умолчанию 3

urlConfig

Путь до файла с настройками endpoint, используется для egress

Пример заполненного файла adapter.conf

{
   "batchSize": "1000",
   "egressEnabled":"rest",
   "egressPublish": {
       "ssl.vault.tls.keystore.location": "path/to/vaultclient.jks"
       "ssl.vault.tls.keystore.password":"***",
       "ssl.vault.tls.key.password":"***",
       "ssl.vault.tls.truststore.location": "path/to/vaultclient.jks"
       "ssl.vault.tls.truststore.password":"***",
       "ssl.protocol":"TLSv1.2",
       "ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
       "ssl.vault.auth.type":"CERTIFICATE",
       "ssl.keystore.location": "path/to/client.test1.jks",
       "ssl.vault.alias.key": "test-1",
       "ssl.truststore.location": "path/to/client.test1.jks",
       "ssl.vault.address":"https://host:port/",
       "ssl.vault.namespace":"",
       "ssl.vault.pki.common.name":"KafkaClient1",
       "ssl.vault.pki.email":"example@mail.com",
       "ssl.vault.pki.alt.names":"localhost",
       "ssl.vault.pki.alt.ip":"ip",
       "ssl.vault.secret.path":"path/to/certstore",
       "ssl.vault.secret.key":"key",
       "ssl.vault.secret.keystore":"keystore",
       "ssl.vault.secret.truststore":"truststore",
       "ssl.vault.engine.version":"1",
       "ssl.vault.pki.role.name":"test1",
       "ssl.vault.pki.crl.enable": "true",

       "host": "localhost",
       "port": 9002,
       "ssl.white.list.path":"path/to/allowed-dn.txt",
       "needInfoAboutAcknowledge": true,
       "isHttp2":false
      }
   "egressSubscribe": {
      "ssl.vault.tls.keystore.location": "path/to/certs/vaultclient.jks"
      "ssl.vault.tls.keystore.password":"***",
      "ssl.vault.tls.key.password":"***",
      "ssl.vault.tls.truststore.location": "path/to/certs/vaultclient.jks"
      "ssl.vault.tls.truststore.password":"***",
      "ssl.protocol":"TLSv1.2",
      "ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
      "ssl.vault.auth.type":"CERTIFICATE",
      "ssl.keystore.location": "path/to/certs/client.test1.jks",
      "ssl.vault.alias.key": "test-1",
      "ssl.truststore.location": "path/to/certs/client.test1.jks",
      "ssl.vault.address":"https://host:port/",
      "ssl.vault.namespace":"",
      "ssl.vault.pki.common.name":"KafkaClient1",
      "ssl.vault.pki.email":"example@mail.com",
      "ssl.vault.pki.alt.names":"localhost",
      "ssl.vault.pki.alt.ip":"ip",
      "ssl.vault.secret.path":"path/to/certstore",
      "ssl.vault.secret.key":"key",
      "ssl.vault.secret.keystore":"keystore",
      "ssl.vault.secret.truststore":"truststore",
      "ssl.vault.engine.version":"1",
      "ssl.vault.pki.role.name":"test1",
      "ssl.vault.pki.crl.enable": "true",

      "host": "localhost",
      "port": 9003,
      "ssl.white.list.path":"path/to/allowed-dn.txt",
      "charset": "UTF-8",
      "isHttp2":false
     }
   "ed": {
       "url": "host:port",
       "authority": "DN cert",
       "ssl.keystore.location":"path/to/grpc-adapter.jks",
       "ssl.keystore.password":"***",
       "ssl.truststore.location":"path/to/grpc-adapter.jks",
       "ssl.truststore.password":"***",
       "ssl.protocol":"TLSv1.2",
       "ssl.conscrypt": false

   },
   "urlConfigPath":"path/to/urlConfig.json",
   "eventDiscoveryEnabled": false,
   "healthCheckPort": "port",
   "brokerWhiteListPath": "path/to/allowed-dn2.txt",
   "processors": {
       "log": {
           "format": "auto",
           "level": "info"
       }
   },
   "producer": {
       "properties": "path/to/prop.properties",
       "topic": "testTopic"
   },
   "producerType": "kafka",
   "retries": "3",
   "threadCount": 1,
   "timeout": "1000"
}

Поддерживаемые типы транспорта для параметров Сonsumer и Producer:#

  • kafka — Platform V Corax/Apache Kafka;

  • artemis — брокер Artemis MQ;

  • ibm_mq — брокер IBM MQ;

  • rabbit — брокер RabbitMQ.

Функционал, связанный с IBM MQ, не дорабатывается.

В дальнейшем планируется добавление других типов транспорта.

Параметры получателя и отправителя состоят из 2-х частей:

  1. Параметры, специфичные для EVTA, задаются в том же объекте Consumer или Producer.

  2. Параметры, специфичные для библиотеки транспорта, передаются в отдельном файле через параметр properties объектов Consumer или Producer.

Возможные варианты настроек Consumer и Producer указаны в таблице:

Тип брокера

Platform V Corax/Apache Kafka

artemis

ibm_mq

rabbit

Consumer

topic — имя входящего topic
deadLetter — topic ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами библиотеки Apache Kafka
eventUrl — адрес события в ETCD, параметры из ETCD перезаписывают параметры из *.properties

queue — имя входящей очереди
deadLetter — очередь ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами, содержащего:
connectors — список брокеров для подключения, в формате host:port;
connectorConfigs — общие параметры к org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
factory — параметры к org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
sslFactoryConfigPath — путь до конфигурационного файла с настройками подключения к HashiCorp Vault
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию ""
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально

queue — имя входящей очереди
deadLetter — очередь ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами к com.ibm.mq.jms.MQConnectionFactory, параметр transportType игнорируется и всегда имеет значение 1
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию ""
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально,
keyStorePath — путь до хранилища приватного ключа
keyStorePassword — пароль от хранилища приватного ключа
trustStorePath — путь до хранилища доверенных сертификатов
trustStorePassword — пароль от хранилища доверенных сертификатов
protocol — версия протокола TLS, по умолчанию TLSv1.2
storeType — тип хранилища, по умолчанию JKS
conscrypt — признак использования библиотеки Conscrypt, по умолчанию true

properties — путь до файла с параметрами, содержащего:
uri — URI AMQP,опционально;
user — имя пользователя для подключения к брокеру, опционально;
password — пароль пользователя, опционально;
vhost — виртуальный host, используемый при подключении к брокеру, опционально;
recoveryInterval — сколько времени ждать автоматическое восстановление перед попыткой повторного подключения (мс), опционально;
sasl — при значении external устанавливает external конфигурацию sasl, опционально;
nodes — массив известных адресов брокера, перечисленных через запятую (host1:port1,host2:port2);
ssl.key.password — пароль от приватного ключа;
ssl.keystore.location — путь до хранилища приватного ключа;
ssl.keystore.password — пароль от хранилища приватного ключа;
ssl.keystore.type — тип хранилища, по умолчанию JKS;
ssl.truststore.location — путь до хранилища доверенных сертификатов;
ssl.truststore.password — пароль от хранилища доверенных сертификатов;
ssl.truststore.type — тип хранилища, по умолчанию JKS;
ssl.protocol — версия протокола TLS, по умолчанию TLSv2;
prefetchSize — максимальное количество сообщений, которые будет доставлять сервер, опционально. Значение 0, если не ограничено;
receiveMode — тип подписки, может принимать значения subscribe и poll, по умолчанию subscribe;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию "";
queue — имя очереди, по умолчанию input;
timeout timeout блокирующих операций, по умолчанию 1000;
deadLetterExchange — точка обмена для ошибочных сообщений, опционален;
deadLetterRoutingKey — ключ для ошибочных сообщений, опционален;
deadLetterPersistence — режим персистентной отправки для ошибочных сообщений, опционален;
needToConvertToUtf8 — необходимо ли приводить вычитанное сообщение в кодировку UTF-8 (значение изначальной кодировки берется из ContentEncoding, вычитанного сообщения), по умолчанию false

Producer

topic — имя исходящего topic
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
batchSize — размер сообщений, обрабатываемый за одну итерацию, опционально
queryPartitionsInterval — интервал опроса topic на количество доступных партиций в миллисекундах
partitionerType — тип алгоритма выбора партиции при отправке сообщения, доступные значения:
RoundRobin — равномерно распределяет сообщения по партициям, значение по умолчанию;
MurMur2 — вычисляет номер партиции на основе hash функции MurMur2 от ключа или тела сообщения;
header — вычисляет партицию на основе значения заголовка сообщения с именем из параметра partitionerHeader;
random — номер партиции выбирается случайным образом;
properties — путь до файла с параметрами библиотеки Apache Kafka
eventUrl — адрес события в Event Discovery, параметры из Event Discovery перезаписывают параметры из properties

queue — имя входящей очереди,
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами, содержащего:
connectors — список брокеров для подключения, в формате host:port;
connectorConfigs — общие параметры к org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
factory — параметры к org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
sslFactoryConfigPath — путь до конфигурационного файла с настройками подключения к HashiCorp Vault;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию "";
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально

queue — имя входящей очереди
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами к com.ibm.mq.jms.MQConnectionFactory, параметр transportType игнорируется и всегда имеет значение 1
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию ""
messageType — тип исходящего сообщения, значения:
byte — бинарное сообщение, по умолчанию;
text — текстовое сообщение
messageEncoding — кодировка по умолчанию входного бинарного сообщения, если не задан заголовок JMS_IBM_Character_Set
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально
keyStorePath — путь до хранилища приватного ключа
keyStorePassword — пароль от хранилища приватного ключа
trustStorePath — путь до хранилища доверенных сертификатов
trustStorePassword — пароль от хранилища доверенных сертификатов
protocol — версия протокола TLS, по умолчанию TLSv1.2
storeType — тип хранилища, по умолчанию JKS
conscrypt — признак использования библиотеки Conscrypt, по умолчанию true

properties — путь до файла с параметрами, содержащего:
uri — URI AMQP, опционально;
user — имя пользователя для подключения к брокеру, опционально;
password — пароль пользователя, опционально;
vhost — виртуальный host, используемый при подключении к брокеру, опционально;
recoveryInterval — сколько времени ждать автоматическое восстановление перед попыткой повторного подключения (мс), опционально;
sasl — при значении external устанавливает external конфигурацию sasl, опционально;
nodes — массив известных адресов брокера, перечисленных через запятую (host1:port1,host2:port2);
ssl.key.password — пароль от приватного ключа;
ssl.keystore.location — путь до хранилища приватного ключа;
ssl.keystore.password — пароль от хранилища приватного ключа;
ssl.keystore.type — тип хранилища, по умолчанию JKS;
ssl.truststore.location — путь до хранилища доверенных сертификатов;
ssl.truststore.password — пароль от хранилища доверенных сертификатов;
ssl.truststore.type — тип хранилища, по умолчанию JKS;
ssl.protocol — версия протокола TLS, по умолчанию TLSv1.2;
prefetchSize — максимальное количество сообщений, которые будет доставлять сервер, опционально. Значение 0, если не ограничено;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию "";
publishAck — ждать ли подтверждения получения от rabbit перед подтверждением сообщения, по умолчанию false;
timeout — timeout блокирующих операций, по умолчанию 1000;
exchange — точка обмена для отправки сообщений;
routingKey — ключ для отправки сообщений;
persistence режим персистентной отправки.

Пример конфигурации kafka-kafka#

В данном случае EVTA используется как перекладчик из одного topic EVTD в другой.

Файл adapter.conf:

{
...
  batchSize: 1000
  timeout: 10000
  threadCount: 2
  healthCheckPort: 8081
  consumer: {
    topic: "input"
    properties: "conf/consumer.properties"
  }
  producer: {
    topic: "output"
    bufferSize: 1000
    queryPartitionsInterval: 5000
    properties: "conf/producer.properties"
  
  }
...
}

Файл consumer.properties:

bootstrap.servers = "host.docker.internal:9092"
client.id ="consumer.id"
group.id = "consumer.group.id"

Файл producer.properties:

bootstrap.servers = "host.docker.internal:9092"
client.id = "producer.id"
group.id = "producer.group.id"

Пример конфигурации mq-kafka#

В данном случае EVTA используется как шлюз между брокером очередей IBM MQ и брокером EVTD.

Файл adapter.conf:

{
  ...
  batchSize: 1000
  timeout: 10000
  retries: 1
  threadCount: 8
  healthCheckPort: 8087
  consumerType: "ibm_mq"
  consumer: {
    queue: "TEST"
    protocol: "TLSv1"
    conscrypt: false
    keyStorePath: "***"
    keyStorePassword: "*******"
    trustStorePath: "***"
    trustStorePassword: "*******"
    properties: "conf/mq.properties"
  }
  producer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
...
}

Файл mq.properties:

hostName = localhost
port = 1420
queueManager = PMS.PPRB.IM1
channel = CEP.SVRCONN
SSLCipherSuite = TLS_RSA_WITH_AES_128_CBC_SHA

Файл kafka.properties:

bootstrap.servers = "host:port"
group.id = vault-test
client.id = vault-client
security.protocol = SSL
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = /path/to/client.test.jks
ssl.truststore.location = /path/to/client.test.jks
ssl.protocol = TLSv1.2
ssl.engine.factory.class = ru.sbt.ss.kafka.VaultSslEngineFactory
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = APPROLE
ssl.vault.auth.role.id = <role id>
ssl.vault.auth.secret.id = <secret id>
ssl.vault.pki.role.name = <role name>
ssl.vault.pki.common.name = <common name>
ssl.vault.secret.path = <path to secret>
ssl.vault.secret.key = <key>
ssl.vault.secret.keystore = <keystore>
ssl.vault.secret.truststore = <truststore>
ssl.vault.engine.version = 1
ssl.vault.tls.truststore.location = /path/to/vaultclient.jks
ssl.vault.tls.truststore.password = <password>

Пример конфигурации kafka-artemis#

В данном случае EVTA используется как шлюз между брокером очередей SMBX и брокером EVTD.

Файл adapter.conf:

{...
  batchSize: 1000
  timeout: 10000
  threadCount: 2
  healthCheckPort: 8087
  producerType: artemis
  consumer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
  producer: {
    queue: "TEST"
    properties: "conf/artemis.conf"
  }
...
}

Файл artemis.conf:

{
    connectors: "localhost:61616, 127.0.0.1:61617"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "***"
        keyStorePassword: "*******"
        trustStorePath: "***"
        trustStorePassword: "*******"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
    }
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}

Файл kafka.properties:

bootstrap.servers = "host:port"
group.id = vault-test
client.id = vault-client
security.protocol = SSL
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = /path/to/client.test.jks
ssl.truststore.location = /path/to/client.test.jks
ssl.protocol = TLSv1.2
ssl.engine.factory.class = ru.sbt.ss.kafka.VaultSslEngineFactory
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = APPROLE
ssl.vault.auth.role.id = <role id>
ssl.vault.auth.secret.id = <secret id>
ssl.vault.pki.role.name = <role name>
ssl.vault.pki.common.name = <common name>
ssl.vault.secret.path = <path to secret>
ssl.vault.secret.key = <key>
ssl.vault.secret.keystore = <keystore>
ssl.vault.secret.truststore = <truststore>
ssl.vault.engine.version = 1
ssl.vault.tls.truststore.location = /path/to/vaultclient.jks
ssl.vault.tls.truststore.password = <password>

Пример конфигурации artemis-artemis#

В данном случае EVTA используется как шлюз между брокером очередей SMBX.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "artemis.conf",
        "queue": "TO_TEST"
    },
    "producerType": "artemis",
    "consumer": {
        "properties": "artemis.conf",
        "queue": "FROM_TEST"
    },
    "consumerType": "artemis",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Файл artemis.conf:

{
    connectors: "localhost:61616"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "/path/to/cert.jks"
        keyStorePassword: "password"
        trustStorePath: "/path/to/cert.jks"
        trustStorePassword: "password"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
        confirmationWindowSize: 1
    }
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}

Пример конфигурации mq-mq#

В данном случае EVTA используется как шлюз между брокерами очередей IBM MQ.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "mq.properties",
        "queue": "TO_TEST"
    },
    "producerType": "ibm_mq",
    "consumer": {
        "properties": "mq.properties",
        "queue": "FROM_TEST"
    },
    "consumerType": "ibm_mq",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Пример файла mq.properties:

hostName = localhost
port = 1414
queueManager = QM1
channel = DEV.APP.SVRCONN

Пример конфигурации artemis-mq#

В данном случае EVTA используется как шлюз между брокерами очередей SMBX и IBM MQ.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "mq.properties",
        "queue": "TO_TEST"
    },
    "producerType": "ibm_mq",
    "consumer": {
        "properties": "artemis.conf",
        "queue": "FROM_TEST"
    },
    "consumerType": "artemis",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Файл artemis.conf:

{
    connectors: "localhost:61616"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "/path/to/cert.jks"
        keyStorePassword: "password"
        trustStorePath: "/path/to/cert.jks"
        trustStorePassword: "password"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
        confirmationWindowSize: 1
    }
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}

Пример файла mq.properties:

hostName = localhost
port = 1414
queueManager = QM1
channel = DEV.APP.SVRCONN

Валидация сообщений и настройка логирования#

Обработка сообщения является промежуточным этапом передачи между получением сообщения из источника и отправкой в точку назначения. Задать шаги обработки можно задать через блок параметров processors в конфигурационном файле adapter.conf.

Каждый этап обработки сообщения не зависит от наличия других шагов и является атомарным процессом. Шаги обработки передаются в этом параметре в формате ключ-значение, где ключ — идентификатор шага обработки, значение — параметры шага.

EVTA предоставляет встроенные шаги обработки сообщений:

  • xsd — проверка тела сообщения по XSD схеме, параметры:

    • schemas — массив/список путей до XSD схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

  • jsonSchema — проверка тела сообщения по JSON схеме, параметры:

    • schemas — массив/список путей до JSON схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

  • avro проверка тела сообщения по AVRO схеме, параметры:

    • schemas — массив/список путей до avro схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8

  • log — логирование тела сообщения в лог EVTA, параметры:

    • format — формат тела сообщения, поддерживаемые значения xml, json, либо auto для автоопределения формата, по умолчанию auto;

    • level — уровень логирования сообщения, по умолчанию info;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

    • logOnlyHeaders — если имеет значение true, то логируются только заголовки сообщений. Значение по умолчанию false;

  • textConversion — перекодировка тела сообщения из одной кодировки в другую:

    • input — кодировка входящего сообщения;

    • output — кодировка исходящего сообщения;

  • header — фильтрация заголовков сообщения:

    • enabled — если имеет значение true, то фильтрация заголовков сообщения включена. Значение по умолчанию true;

    • remove — удаление всех заголовков сообщения. Значение по умолчанию false;

    • include — массив/список заголовков для включения. Значение по умолчанию пустой список;

    • exclude — массив/список заголовков для исключения. Значение по умолчанию пустой список;

  • keyFilter — фильтрация ключа сообщения:

    • enabled — если имеет значение true, то фильтрация ключа сообщения включена. Значение по умолчанию false.

Шаги xsd, jsonSchema, avro выполняются успешно при первой успешной проверке тела сообщения по схеме. Если сообщение не соответствует ни одной схеме, то возникает ошибка обработки. В случае успеха в результате обработки возвращается оригинальное сообщение без изменений.

Шаг log всегда работает без ошибок и не влияет на обработку сообщений, любое перехватываемое исключение записывается в логах EVTA без прерывания дальнейшей обработки.

В шаге header при задании нескольких параметров выполняется только параметр с наивысшим приоритетом (приоритеты в порядке уменьшения — enabled, remove, include, exclude).

Пример конфигурации с валидацией по XSD схеме и логированием тела сообщения#

{
  batchSize: 1000
  timeout: 10000
  retries: 1
  threadCount: 8
  healthCheckPort: 8087
  consumerType: "kafka"
  consumer: {
    topic: "input"
    properties: "conf/consumer.properties"
  }
  producer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
  processors: {
    xsd: {
      schemas: ["/path/to/schema/one.xsd", "/path/to/schema/two.xsd"]
    }
    log: {
      format: "xml"
    }
  }
}