Настройки конфигурационного файла adapter.conf#

Данный раздел применим для роли Администратор.

Все конфигурационные настройки EVTA задаются в файле adapter.conf, который необходимо заполнить до запуска процесса установки EVTA. При заполнении конфигурационного файла необходимо ориентироваться на указанные комментарии.

В таблице представлены параметры настройки, задаваемые в файле adapter.conf. Пример заполненного файла приведен ниже:

Параметр

Описание

threadCount

Количество потоков обработки сообщений, по умолчанию равно количеству ядер в системе

timeout

Общий тайм-аут на блокирующие операции

batchSize

Количество сообщений, обрабатываемых за одну итерацию

batchTimeout

Тайм-аут выполнения одной итерации обработки (мс)

retries

Количество попыток запуска EVTA

retryInterval

Интервал между попытками запуска EVTA (мс)

retryMultiplier

Множитель каждой следующей попытки

healthCheckHost

HTTP хост для получения HealthCheck

healthCheckPort

HTTP порт для получения HealthCheck, если значение не передано или равно -1, то HTTP-сервер не поднимается

changeLoggersAvailable

Возможность изменения уровня незаданных в logback логеров, по умолчанию false

changeRootAvailable

Возможность изменения уровня root логера, по умолчанию false

consumer

Параметры получателя

producer

Параметры отправителя

consumerType

Тип получателя, по умолчанию kafka

producerType

Тип отправителя, по умолчанию kafka

eventDiscoveryEnabled

Включение Event Discovery, по умолчанию true

ed

Параметры подключения к Event Discovery

ed: url

Адрес подключения к ETCD

ed: authority

CN сертификата сервера ETCD

ed: ssl.key.password

Пароль от приватного ключа

ed: ssl.keystore.location

Путь до хранилища приватного ключа

ed: ssl.keystore.password

Пароль от хранилища приватного ключа

ed: ssl.keystore.type

Тип хранилища, по умолчанию JKS

ed: ssl.truststore.location

Путь до хранилища доверенных сертификатов

ed: ssl.truststore.password

Пароль от хранилища доверенных сертификатов

ed: ssl.truststore.type

Тип хранилища, по умолчанию JKS

ed: ssl.protocol

Версия протокола TLS, по умолчанию TLSv1.2

ed: ssl.conscrypt

Признак использования библиотеки Conscrypt, по умолчанию true

processors

Настройки дополнительных обработчиков сообщений

egressEnabled

Тип используемого egress, поддерживаются 2 типа: REST и gRPC (none, если не используется)

egressSubscribe/egressPublish

Настройки egress для подписки и публикации соответственно

egressSubscribe/egressPublish: isHttp2

Значение по умолчанию false

egressSubscribe/egressPublish: host

Хост, на котором будет запущен сервер

egressSubscribe/egressPublish: port

Порт, на котором будет запущен сервер

egressSubscribe/egressPublish: secret

Ключ для расшифровки зашифрованных паролей

egressSubscribe/egressPublish: autoCommit

Значение по умолчанию false, отвечает за то, необходимо ли подтверждение получения сообщения со стороны клиента для его коммита

egressSubscribe/egressPublish: useStorageTime

Значение по умолчанию true, отвечает за то, необходима ли проверка максимального времени последнего подключения

egressSubscribe/egressPublish: delay

Значение по умолчанию 60000, периодичность проверки (мс) (используется, если установлен useStorageTime)

egressSubscribe/egressPublish: storageTime

Значение по умолчанию 30000, максимальное время последнего подключения (мс) (используется, если установлен useStorageTime)

egressSubscribe/egressPublish: needInfoAboutAcknowledge

Если флаг установлен, то ответы о публикации будут приходить только после подтверждения от транспорта, иначе сразу после отправки

egressSubscribe/egressPublish: ssl.white.list.path

Путь до файла со списком разрешенных для подключения к EVTA DN сертификатов (проверка происходит, если параметр указан)

egressSubscribe/egressPublish: charset

Кодировка сообщений для вычитки REST, по умолчанию UTF-8. В приоритете — значение из соответствующего заголовка вычитанного сообщения

egressSubscribe/egressPublish: ssl.key.password

Пароль от приватного ключа

egressSubscribe/egressPublish: ssl.keystore.location

Путь до хранилища приватного ключа

egressSubscribe/egressPublish: ssl.keystore.password

Пароль от хранилища приватного ключа

egressSubscribe/egressPublish: ssl.truststore.password

Пароль от хранилища доверенных сертификатов

egressSubscribe/egressPublish: ssl.protocol

Тип протокола SSL, который будет получен из vault. Значение по умолчанию TLSv1.2

egressSubscribe/egressPublish: ssl.cipher.suites

Перечисление используемых шифров через запятую, по умолчанию – все доступные

egressSubscribe/egressPublish: ssl.keystore.type

Тип хранилища, по умолчанию JKS

egressSubscribe/egressPublish: ssl.truststore.type

Тип хранилища, по умолчанию JKS

egressSubscribe/egressPublish: ssl.endpoint.identification.algorithm

Алгоритм идентификации endpoint подключения для проверки имени хоста сервера с использованием сертификата сервера. Значение по умолчанию https

egressSubscribe/egressPublish: ssl.conscrypt

Использование библиотеки concrypt. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.allowed.dn

Список разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.allowed.dn.file

Расположение файла со списком разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.tls.key.password

Пароль от приватного ключа хранилища

egressSubscribe/egressPublish: ssl.vault.tls.keystore.location

Путь до keystore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.keystore.password

Пароль от keystore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.truststore.location

Путь до truststore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.truststore.password

Пароль от truststore хранилища

egressSubscribe/egressPublish: ssl.vault.tls.protocol

Для хранилища: тип протокола SSL, который будет получен из vault. Значение по умолчанию TLSv1.2

egressSubscribe/egressPublish: ssl.vault.tls.keystore.type

Формат файла keystore хранилища. Значение по умолчанию JKS

egressSubscribe/egressPublish: ssl.vault.tls.truststore.type

Формат файла truststore хранилища. Значение по умолчанию JKS

egressSubscribe/egressPublish: ssl.vault.tls.endpoint.identification.algorithm

Для хранилища: алгоритм идентификации endpoint подключения для проверки имени сервера с использованием сертификата сервера. Значение по умолчанию https

egressSubscribe/egressPublish: ssl.vault.tls.conscrypt

Для хранилища: использование библиотеки conscrypt. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn

Для хранилища: список разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn.file

Для хранилища: расположение файла со списком разрешенных DN сертификатов

egressSubscribe/egressPublish: ssl.vault.address

HashiCorp Vault URL

egressSubscribe/egressPublish: ssl.vault.namespace

Пространство имен HashiCorp Vault

egressSubscribe/egressPublish: ssl.vault.auth.type

Тип аутентификации. Доступные значения:approle, token, password, cert. Значение по умолчанию approle

egressSubscribe/egressPublish: ssl.vault.auth.mount

Путь монтирования аутентификации

egressSubscribe/egressPublish: ssl.vault.auth.role.id

Role ID для типа аутентификации approle

egressSubscribe/egressPublish: ssl.vault.auth.secret.id

Secret ID для типа аутентификации approle

egressSubscribe/egressPublish: ssl.vault.auth.token

Токен для типа аутентификации token

egressSubscribe/egressPublish: ssl.vault.auth.username

Имя пользователя для аутентификации типа password

egressSubscribe/egressPublish: ssl.vault.auth.password

Пароль для аутентификации типа password

egressSubscribe/egressPublish: ssl.vault.tls.enable

Наличие защищенного соединения с HashiCorp Vault. Значение по умолчанию true

egressSubscribe/egressPublish: ssl.vault.namespace

Vault namespace

egressSubscribe/egressPublish: ssl.vault.pki.mount

Путь монтирования Vault PKI API. Значение по умолчанию pki

egressSubscribe/egressPublish: ssl.vault.pki.role.name

Имя роли в PKI Engine

egressSubscribe/egressPublish: ssl.vault.pki.common.name

Common name для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.crl.enable

Значение по умолчанию true, параметр для управления механизмом проверки сертификата по списку отозванных

egressSubscribe/egressPublish: ssl.vault.pki.email

Email для получения сертификата из HashiCorp Vault

egressSubscribe/egressPublish: ssl.vault.pki.alt.names

Alternative names для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.alt.ip

Alternative ip для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.ttl

TTL для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.csr

CSR для генерации сертификатов

egressSubscribe/egressPublish: ssl.vault.pki.trust.only

Загружать только truststore. Значение по умолчанию false

egressSubscribe/egressPublish: ssl.vault.secret.path

Секретный путь для ключа, keystore и truststore паролей

egressSubscribe/egressPublish: ssl.vault.secret.key

Secret key для пароля ключа

egressSubscribe/egressPublish: ssl.vault.secret.keystore

Secret key для keystore пароля

egressSubscribe/egressPublish: ssl.vault.secret.truststore

Secret key для truststore пароля

egressSubscribe/egressPublish: ssl.vault.before.expire

Время до даты экспирации для обновления сертификатов. Значение по умолчанию PT0S

egressSubscribe/egressPublish: ssl.vault.alias.ca

Псевдоним для CA сертификата в truststore

egressSubscribe/egressPublish: ssl.vault.alias.key

Псевдоним для ключа и сертификата в keystore

egressSubscribe/egressPublish: ssl.vault.engine.version

Версия API для key-value машины. Значение по умолчанию 2

egressSubscribe/egressPublish: ssl.vault.kv.mount.path

Путь до API движка Key-Value хранилища секретов (обязательно при ssl.vault.engine.version: 2)

egressSubscribe/egressPublish: ssl.vault.retries

Количество повторных попыток запроса. Значение по умолчанию 5

egressSubscribe/egressPublish: ssl.vault.retry.interval

Интервал между попытками (мс). Значение по умолчанию 500

egressSubscribe/egressPublish: ssl.vault.timeout

Тайм-аут запроса (с). Значение по умолчанию 3

urlConfig

Путь до файла с настройками endpoint, используется для egress

Логика обработки сообщений#

Логику обработки сообщений можно задать следующими параметрами:

  • batchSize – определяет количество сообщений, которые необходимо обработать на одну итерацию (пачка);

  • batchTimeout – определяет время выполнения одной итерации обработки.

При этом, если в конфигурации adapter.conf заданы оба параметра, то приоритет будет у batchTimeout – сообщения будут опрокидываться по истечению времени, заданному в batchTimeout, не дожидаясь накопления всей пачки сообщений, размер которой был задан в параметре batchSize.

Валидация сообщений и настройка логирования#

Обработка сообщения является промежуточным этапом передачи между получением сообщения из источника и отправкой в точку назначения. Задать шаги обработки можно задать через блок параметров processors в конфигурационном файле adapter.conf.

Каждый этап обработки сообщения не зависит от наличия других шагов и является атомарным процессом. Шаги обработки передаются в этом параметре в формате ключ-значение, где ключ — идентификатор шага обработки, значение — параметры шага.

EVTA предоставляет встроенные шаги обработки сообщений:

  • xsd — проверка тела сообщения по XSD схеме, параметры:

    • schemas — массив/список путей до XSD схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

  • jsonSchema — проверка тела сообщения по JSON схеме, параметры:

    • schemas — массив/список путей до JSON схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

  • avro проверка тела сообщения по AVRO схеме, параметры:

    • schemas — массив/список путей до avro схем;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8

  • log — логирование тела сообщения в лог EVTA, параметры:

    • format — формат тела сообщения, поддерживаемые значения xml, json, либо auto для автоопределения формата, по умолчанию auto;

    • level — уровень логирования сообщения, по умолчанию info;

    • encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;

    • logOnlyHeaders — если имеет значение true, то логируются только заголовки сообщений. Значение по умолчанию false;

  • textConversion — перекодировка тела сообщения из одной кодировки в другую:

    • input — кодировка входящего сообщения;

    • output — кодировка исходящего сообщения;

  • header — фильтрация заголовков сообщения:

    • enabled — если имеет значение true, то фильтрация заголовков сообщения включена. Значение по умолчанию true;

    • remove — удаление всех заголовков сообщения. Значение по умолчанию false;

    • include — массив/список заголовков для включения. Значение по умолчанию пустой список;

    • exclude — массив/список заголовков для исключения. Значение по умолчанию пустой список;

  • keyFilter — фильтрация ключа сообщения:

    • enabled — если имеет значение true, то фильтрация ключа сообщения включена. Значение по умолчанию false.

Шаги xsd, jsonSchema, avro выполняются успешно при первой успешной проверке тела сообщения по схеме. Если сообщение не соответствует ни одной схеме, то возникает ошибка обработки. В случае успеха в результате обработки возвращается оригинальное сообщение без изменений.

Шаг log всегда работает без ошибок и не влияет на обработку сообщений, любое перехватываемое исключение записывается в логах EVTA без прерывания дальнейшей обработки.

В шаге header при задании нескольких параметров выполняется только параметр с наивысшим приоритетом (приоритеты в порядке уменьшения — enabled, remove, include, exclude).

Пример заполненного файла adapter.conf

{
   "batchSize": "1000",
   "egressEnabled":"rest",
   "egressPublish": {
       "ssl.vault.tls.keystore.location": "path/to/vaultclient.jks"
       "ssl.vault.tls.keystore.password":"***",
       "ssl.vault.tls.key.password":"***",
       "ssl.vault.tls.truststore.location": "path/to/vaultclient.jks"
       "ssl.vault.tls.truststore.password":"***",
       "ssl.protocol":"TLSv1.2",
       "ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
       "ssl.vault.auth.type":"CERTIFICATE",
       "ssl.keystore.location": "path/to/client.test1.jks",
       "ssl.vault.alias.key": "test-1",
       "ssl.truststore.location": "path/to/client.test1.jks",
       "ssl.vault.address":"https://host:port/",
       "ssl.vault.namespace":"",
       "ssl.vault.pki.common.name":"KafkaClient1",
       "ssl.vault.pki.email":"example@mail.com",
       "ssl.vault.pki.alt.names":"localhost",
       "ssl.vault.pki.alt.ip":"ip",
       "ssl.vault.secret.path":"path/to/certstore",
       "ssl.vault.secret.key":"key",
       "ssl.vault.secret.keystore":"keystore",
       "ssl.vault.secret.truststore":"truststore",
       "ssl.vault.engine.version":"1",
       "ssl.vault.pki.role.name":"test1",
       "ssl.vault.pki.crl.enable": "true",
       "host": "localhost",
       "port": 9002,
       "ssl.white.list.path":"path/to/allowed-dn.txt",
       "needInfoAboutAcknowledge": true,
       "isHttp2":false
      }
   "egressSubscribe": {
      "ssl.vault.tls.keystore.location": "path/to/certs/vaultclient.jks"
      "ssl.vault.tls.keystore.password":"***",
      "ssl.vault.tls.key.password":"***",
      "ssl.vault.tls.truststore.location": "path/to/certs/vaultclient.jks"
      "ssl.vault.tls.truststore.password":"***",
      "ssl.protocol":"TLSv1.2",
      "ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
      "ssl.vault.auth.type":"CERTIFICATE",
      "ssl.keystore.location": "path/to/certs/client.test1.jks",
      "ssl.vault.alias.key": "test-1",
      "ssl.truststore.location": "path/to/certs/client.test1.jks",
      "ssl.vault.address":"https://host:port/",
      "ssl.vault.namespace":"",
      "ssl.vault.pki.common.name":"KafkaClient1",
      "ssl.vault.pki.email":"example@mail.com",
      "ssl.vault.pki.alt.names":"localhost",
      "ssl.vault.pki.alt.ip":"ip",
      "ssl.vault.secret.path":"path/to/certstore",
      "ssl.vault.secret.key":"key",
      "ssl.vault.secret.keystore":"keystore",
      "ssl.vault.secret.truststore":"truststore",
      "ssl.vault.engine.version":"1",
      "ssl.vault.pki.role.name":"test1",
      "ssl.vault.pki.crl.enable": "true",
      "host": "localhost",
      "port": 9003,
      "ssl.white.list.path":"path/to/allowed-dn.txt",
      "charset": "UTF-8",
      "isHttp2":false
     }
   "ed": {
       "url": "host:port",
       "authority": "DN cert",
       "ssl.keystore.location":"path/to/grpc-adapter.jks",
       "ssl.keystore.password":"***",
       "ssl.truststore.location":"path/to/grpc-adapter.jks",
       "ssl.truststore.password":"***",
       "ssl.protocol":"TLSv1.2",
       "ssl.conscrypt": false

   },
   "urlConfigPath":"path/to/urlConfig.json",
   "eventDiscoveryEnabled": false,
   "healthCheckPort": "port",
   "brokerWhiteListPath": "path/to/allowed-dn2.txt",
   "processors": {
       "log": {
           "format": "auto",
           "level": "info"
       }
   },
   "producer": {
       "properties": "path/to/prop.properties",
       "topic": "testTopic"
   },
   "producerType": "kafka",
   "retries": "3",
   "threadCount": 1,
   "timeout": "1000"
}

Поддерживаемые типы транспорта для параметров Сonsumer и Producer:#

  • kafka — Platform V Corax/Apache Kafka;

  • artemis — брокер Artemis MQ;

  • ibm_mq — брокер IBM MQ;

  • rabbit — брокер RabbitMQ.

Функциональность, связанный с IBM MQ, не дорабатывается. В дальнейшем планируется добавление других типов транспорта.

Параметры получателя и отправителя состоят из 2-х частей:

  1. Параметры, специфичные для EVTA, задаются в том же объекте Consumer или Producer.

  2. Параметры, специфичные для библиотеки транспорта, передаются в отдельном файле через параметр properties объектов Consumer или Producer.

Возможные варианты настроек Consumer и Producer указаны в таблице:

Тип брокера

Platform V Corax/Apache Kafka

artemis

ibm_mq

rabbit

Consumer

topic — имя входящего топика
deadLetter — топик ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами библиотеки Apache Kafka
eventUrl — адрес события в ETCD, параметры из ETCD перезаписывают параметры из *.properties

queue — имя входящей очереди
deadLetter — очередь ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами, содержащего:
connectors — список брокеров для подключения, в формате host:port;
connectorConfigs — общие параметры к org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
factory — параметры к org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
sslFactoryConfigPath — путь до конфигурационного файла с настройками подключения к HashiCorp Vault
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально

queue — имя входящей очереди
deadLetter — очередь ошибочных сообщений, опционален
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами к com.ibm.mq.jms.MQConnectionFactory, параметр transportType игнорируется и всегда имеет значение 1
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально,
keyStorePath — путь до хранилища приватного ключа
keyStorePassword — пароль от хранилища приватного ключа
trustStorePath — путь до хранилища доверенных сертификатов
trustStorePassword — пароль от хранилища доверенных сертификатов
protocol — версия протокола TLS, по умолчанию TLSv1.2
storeType — тип хранилища, по умолчанию JKS
conscrypt — признак использования библиотеки Conscrypt, по умолчанию true

properties — путь до файла с параметрами, содержащего:
uri — URI AMQP,опционально;
user — имя пользователя для подключения к брокеру, опционально;
password — пароль пользователя, опционально;
vhost — виртуальный хост, используемый при подключении к брокеру, опционально;
recoveryInterval — сколько времени ждать автоматическое восстановление перед попыткой повторного подключения (мс), опционально;
sasl — при значении external устанавливает external конфигурацию sasl, опционально;
nodes — массив известных адресов брокера, перечисленных через запятую (host1:port1,host2:port2);
ssl.key.password — пароль от приватного ключа;
ssl.keystore.location — путь до хранилища приватного ключа;
ssl.keystore.password — пароль от хранилища приватного ключа;
ssl.keystore.type — тип хранилища, по умолчанию JKS;
ssl.truststore.location — путь до хранилища доверенных сертификатов;
ssl.truststore.password — пароль от хранилища доверенных сертификатов;
ssl.truststore.type — тип хранилища, по умолчанию JKS;
ssl.protocol — версия протокола TLS, по умолчанию TLSv2;
prefetchSize — максимальное количество сообщений, которые будет доставлять сервер, опционально. Значение 0, если не ограничено;
receiveMode — тип подписки, может принимать значения subscribe и poll, по умолчанию subscribe;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»;
queue — имя очереди, по умолчанию input;
timeout timeout блокирующих операций, по умолчанию 1000;
deadLetterExchange — точка обмена для ошибочных сообщений, опционален;
deadLetterRoutingKey — ключ для ошибочных сообщений, опционален;
deadLetterPersistence — режим персистентной отправки для ошибочных сообщений, опционален;
needToConvertToUtf8 — необходимо ли приводить вычитанное сообщение в кодировку UTF-8 (значение изначальной кодировки берется из ContentEncoding, вычитанного сообщения), по умолчанию false

Producer

topic — имя исходящего топика
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
batchSize — количество сообщений, обрабатываемых за одну итерацию, опционально
queryPartitionsInterval — интервал опроса топика на количество доступных партиций в миллисекундах
partitionerType — тип алгоритма выбора партиции при отправке сообщения, доступные значения:
RoundRobin — равномерно распределяет сообщения по партициям, значение по умолчанию;
MurMur2 — вычисляет номер партиции на основе hash функции MurMur2 от ключа или тела сообщения;
header — вычисляет партицию на основе значения заголовка сообщения с именем из параметра partitionerHeader;
random — номер партиции выбирается случайным образом;
properties — путь до файла с параметрами библиотеки Apache Kafka
eventUrl — адрес события в Event Discovery, параметры из Event Discovery перезаписывают параметры из properties

queue — имя входящей очереди,
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами, содержащего:
connectors — список брокеров для подключения, в формате host:port;
connectorConfigs — общие параметры к org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
factory — параметры к org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
sslFactoryConfigPath — путь до конфигурационного файла с настройками подключения к HashiCorp Vault;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»;
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально

queue — имя входящей очереди
timeout — timeout блокирующих операций
secret — ключ для расшифровки зашифрованных паролей
properties — путь до файла с параметрами к com.ibm.mq.jms.MQConnectionFactory, параметр transportType игнорируется и всегда имеет значение 1
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»
messageType — тип исходящего сообщения, значения:
byte — бинарное сообщение, по умолчанию;
text — текстовое сообщение
messageEncoding — кодировка по умолчанию входного бинарного сообщения, если не задан заголовок JMS_IBM_Character_Set
user — имя пользователя для подключения к брокеру, опционально
password — пароль пользователя, опционально
keyStorePath — путь до хранилища приватного ключа
keyStorePassword — пароль от хранилища приватного ключа
trustStorePath — путь до хранилища доверенных сертификатов
trustStorePassword — пароль от хранилища доверенных сертификатов
protocol — версия протокола TLS, по умолчанию TLSv1.2
storeType — тип хранилища, по умолчанию JKS
conscrypt — признак использования библиотеки Conscrypt, по умолчанию true

properties — путь до файла с параметрами, содержащего:
uri — URI AMQP, опционально;
user — имя пользователя для подключения к брокеру, опционально;
password — пароль пользователя, опционально;
vhost — виртуальный хост, используемый при подключении к брокеру, опционально;
recoveryInterval — сколько времени ждать автоматическое восстановление перед попыткой повторного подключения (мс), опционально;
sasl — при значении external устанавливает external конфигурацию sasl, опционально;
nodes — массив известных адресов брокера, перечисленных через запятую (host1:port1,host2:port2);
ssl.key.password — пароль от приватного ключа;
ssl.keystore.location — путь до хранилища приватного ключа;
ssl.keystore.password — пароль от хранилища приватного ключа;
ssl.keystore.type — тип хранилища, по умолчанию JKS;
ssl.truststore.location — путь до хранилища доверенных сертификатов;
ssl.truststore.password — пароль от хранилища доверенных сертификатов;
ssl.truststore.type — тип хранилища, по умолчанию JKS;
ssl.protocol — версия протокола TLS, по умолчанию TLSv1.2;
prefetchSize — максимальное количество сообщений, которые будет доставлять сервер, опционально. Значение 0, если не ограничено;
keyHeader — имя заголовка сообщения для использования в качестве значения ключа, значение по умолчанию «»;
publishAck — ждать ли подтверждения получения от rabbit перед подтверждением сообщения, по умолчанию false;
timeout — timeout блокирующих операций, по умолчанию 1000;
exchange — точка обмена для отправки сообщений;
routingKey — ключ для отправки сообщений;
persistence режим персистентной отправки.

Пример конфигурации kafka-kafka#

В данном случае EVTA используется как перекладчик из одного топика Kafka в другой.

Файл adapter.conf:

{
...
  batchSize: 1000
  timeout: 10000
  threadCount: 2
  healthCheckPort: 8081
  consumer: {
    topic: "input"
    properties: "conf/consumer.properties"
  }
  producer: {
    topic: "output"
    bufferSize: 1000
    queryPartitionsInterval: 5000
    properties: "conf/producer.properties"
  
  }
...
}

Файл consumer.properties:

bootstrap.servers = "host.docker.internal:9092"
client.id ="consumer.id"
group.id = "consumer.group.id"

Файл producer.properties:

bootstrap.servers = "host.docker.internal:9092"
client.id = "producer.id"
group.id = "producer.group.id"

Пример конфигурации mq-kafka#

В данном случае EVTA используется как перекладчик между брокером очередей IBM MQ и брокером Kafka.

Файл adapter.conf:

{
  ...
  batchSize: 1000
  timeout: 10000
  retries: 1
  threadCount: 8
  healthCheckPort: 8087
  consumerType: "ibm_mq"
  consumer: {
    queue: "TEST"
    protocol: "TLSv1"
    conscrypt: false
    keyStorePath: "***"
    keyStorePassword: "*******"
    trustStorePath: "***"
    trustStorePassword: "*******"
    properties: "conf/mq.properties"
  }
  producer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
...
}

Файл mq.properties:

hostName = "localhost"
port = "1420"
queueManager = "PMS.PPRB.IM1"
channel = "CEP.SVRCONN"
SSLCipherSuite = "TLS_RSA_WITH_AES_128_CBC_SHAм

Файл kafka.properties:

bootstrap.servers = "host:port"
group.id = "vault-test"
client.id = "vault-client"
security.protocol = "SSL"
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = "/path/to/client.test.jks"
ssl.truststore.location = "/path/to/client.test.jks"
ssl.protocol = "TLSv1.2"
ssl.engine.factory.class = "ru.sbt.ss.kafka.VaultSslEngineFactory"
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = "APPROLE"
ssl.vault.auth.role.id = "<role id>"
ssl.vault.auth.secret.id = "<secret id>"
ssl.vault.pki.role.name = "<role name>"
ssl.vault.pki.common.name = "<common name>"
ssl.vault.secret.path = "<path to secret>"
ssl.vault.secret.key = "<key>"
ssl.vault.secret.keystore = "<keystore>"
ssl.vault.secret.truststore = "<truststore>"
ssl.vault.engine.version = "1"
ssl.vault.tls.truststore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.truststore.password = "<password>"

Пример конфигурации kafka-artemis#

В данном случае EVTA используется как перекладчик между брокером очередей SMBX и брокером Kafka.

Файл adapter.conf:

{...
  batchSize: 1000
  timeout: 10000
  threadCount: 2
  healthCheckPort: 8087
  producerType: artemis
  consumer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
  producer: {
    queue: "TEST"
    properties: "conf/artemis.conf"
  }
...
}

Файл artemis.conf:

{
    connectors: "localhost1:61616, localhost2:61617"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "***"
        keyStorePassword: "*******"
        trustStorePath: "***"
        trustStorePassword: "*******"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
    }
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}

Файл kafka.properties:

bootstrap.servers = "host:port"
group.id = "vault-test"
client.id = "vault-client"
security.protocol = "SSL"
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = "/path/to/client.test.jks"
ssl.truststore.location = "/path/to/client.test.jks"
ssl.protocol = "TLSv1.2"
ssl.engine.factory.class = "ru.sbt.ss.kafka.VaultSslEngineFactory"
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = "APPROLE"
ssl.vault.auth.role.id = "<role id>"
ssl.vault.auth.secret.id = "<secret id>"
ssl.vault.pki.role.name = "<role name>"
ssl.vault.pki.common.name = "<common name>"
ssl.vault.secret.path = "<path to secret>"
ssl.vault.secret.key = "<key>"
ssl.vault.secret.keystore = "<keystore>"
ssl.vault.secret.truststore = "<truststore>"
ssl.vault.engine.version = "1"
ssl.vault.tls.truststore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.truststore.password = "<password>"

Пример конфигурации grpc-kafka#

В данном случае EVTA позволяет обращаться к брокеру сообщений Kafka при помощи gRPC взаимодействия.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"grpc",
    "egress": {
        "type":"publish",
        "host": "localhost",
         "ssl" {
           "key" {
             "password": "*******"
           }
           "keystore": {
             "location": "conf/cep99usr.jks"
             "password": "*******"
           }
           "truststore": {
             "location": "conf/cep99usr.jks"
             "password": "*******"
           }
         }
        "port": 9003
    }
    "urlConfigPath":"/path/to/urlConfig.json",
    "eventDiscoveryEnabled": false,
    "producer": {
        "properties": "/path/to/producer.properties",
        "topic": "testTopic"
    },
    "producerType": "kafka",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Файл urlConfig.json:

{
  "fpss": {
    "K2_IAZCommon": {
      "Common": {
        "ALPHA": {
          "System": {
            "TESTA": {
              "1": {
                "type": "kafka",
                "topic": "TESTB",
                "properties": "/path/to/producer.properties"
              }
            },
            "TESTB": {
              "1": {
                "type": "kafka",
                "topic": "TESTA",
                "properties": "/path/to/producer.properties"
              }
            }
          }
        }
      }
    }
  }
}

Пример конфигурации rabbit-rabbit#

В данном случае EVTA используется как перекладчик из одной очереди RabbitMQ в другую.

Файл adapter.conf:

{
    "batchSize": "1000",
    "eventDiscoveryEnabled": false,
    "healthCheckPort": "8083",
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "/path/to/producer.properties",
    },
    "producerType": "rabbit",
    "consumer": {
        "properties":"/path/to/consumer.properties",
    },
    "consumerType": "rabbit",
    "retries": "1",
    "threadCount": 1,
    "timeout": "1000",
}

Файл producer.properties:

nodes = "localhost:5671"
user = "guest"
password = "guest"
vhost = "Some_Virtual_Host"
exchange = "some_exchange"
routingKey = "routing_key_4"

ssl.keystore.location = "tmp/rabbit-vault-keystore.jks"
ssl.vault.tls.keystore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.keystore.password = "{{ password }}"
ssl.vault.tls.key.password = "{{ password }}"
ssl.vault.tls.truststore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.truststore.password = "{{ password }}"
ssl.protocol = "TLSv1.2"
ssl.keystore.mode = "600"
ssl.endpoint.identification.algorithm = ""
ssl.truststore.location = "tmp/rabbit-vault-truststore.jks"
ssl.truststore.mode = "600"
ssl.vault.address = "https://localhost:8200"
ssl.vault.tls.enable = "true"
ssl.vault.auth.type = "APPROLE"
ssl.vault.auth.role.id = "d2a190c0-29d4-6dd2-f474-c4d15524febe"
ssl.vault.auth.secret.id = "{{ secret_id }}"
ssl.vault.pki.role.name = "default"
ssl.vault.pki.common.name = "test"
ssl.vault.secret.path = "kv1/certstore"
ssl.vault.secret.key = "key"
ssl.vault.secret.keystore = "keystore"
ssl.vault.secret.truststore = "truststore"
ssl.vault.engine.version = "1"
ssl.vault.pki.email = "vasya@rambler.ru"
ssl.vault.namespace = ""
ssl.key.password = "{{ password }}"

Файл consumer.properties:

nodes = "localhost:5671"
user = "guest"
password = "guest"
vhost = "Some_Virtual_Host"
queue = "test.5"

ssl.keystore.location = "tmp/rabbit-vault-keystore.jks"
ssl.vault.tls.keystore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.keystore.password = "{{ password }}"
ssl.vault.tls.key.password = "{{ password }}"
ssl.vault.tls.truststore.location = "/path/to/vaultclient.jks"
ssl.vault.tls.truststore.password = "{{ password }}"
ssl.protocol = "TLSv1.2"
ssl.keystore.mode = "600"
ssl.endpoint.identification.algorithm = ""
ssl.truststore.location = "tmp/rabbit-vault-truststore.jks"
ssl.truststore.mode = "600"
ssl.vault.address = "https://localhost:8200"
ssl.vault.tls.enable = "true"
ssl.vault.auth.type = "APPROLE"
ssl.vault.auth.role.id = "d2a190c0-29d4-6dd2-f474-c4d15524febe"
ssl.vault.auth.secret.id = "{{ secret_id }}"
ssl.vault.pki.role.name = "default"
ssl.vault.pki.common.name = "test"
ssl.vault.secret.path = "kv1/certstore"
ssl.vault.secret.key = "key"
ssl.vault.secret.keystore = "keystore"
ssl.vault.secret.truststore = "truststore"
ssl.vault.engine.version = "1"
ssl.vault.pki.email = "vasya@rambler.ru"
ssl.vault.namespace = ""
ssl.key.password = "{{ password }}"

Файл urlConfig.json для rabbitMQ:

{
  "fpss": {
    "Domain": {
      "Federation": {
        "segment": {
          "System": {
            "TESTA": {
              "1": {
                "type": "rabbit",
                "properties": "/path/to/rabbitmq.properties",
                "queue": "test.4",
                "exchange": "some_exchange",
                "routingKey": "routing_key_4"
              }
            },
            "TESTB": {
              "1": {
                "type": "rabbit",
                "properties": "/path/to/rabbitmq.properties",
                "queue": "test.2"
              }
            }
          }
        }
      }
    }
  }
}

Пример конфигурации artemis-artemis#

В данном случае EVTA используется как перекладчик между брокерами очередей SMBX.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "artemis.conf",
        "queue": "TO_TEST"
    },
    "producerType": "artemis",
    "consumer": {
        "properties": "artemis.conf",
        "queue": "FROM_TEST"
    },
    "consumerType": "artemis",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Файл artemis.conf:

{
    connectors: "localhost:61616"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "/path/to/cert.jks"
        keyStorePassword: "password"
        trustStorePath: "/path/to/cert.jks"
        trustStorePassword: "password"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
        confirmationWindowSize: 1
    }
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}

Пример конфигурации mq-mq#

В данном случае EVTA используется как перекладчик между брокерами очередей IBM MQ.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "mq.properties",
        "queue": "TO_TEST"
    },
    "producerType": "ibm_mq",
    "consumer": {
        "properties": "mq.properties",
        "queue": "FROM_TEST"
    },
    "consumerType": "ibm_mq",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Пример файла mq.properties:

hostName = "localhost"
port = "1414"
queueManager = "QM1"
channel = "DEV.APP.SVRCONN"

Пример конфигурации artemis-mq#

В данном случае EVTA используется как перекладчик между брокерами очередей SMBX и IBM MQ.

Файл adapter.conf:

{
    "batchSize": "1000",
    "egressEnabled":"none",
    "eventDiscoveryEnabled": false,
    "processors": {
        "log": {
            "format": "auto",
            "level": "info"
        }
    },
    "producer": {
        "properties": "mq.properties",
        "queue": "TO_TEST"
    },
    "producerType": "ibm_mq",
    "consumer": {
        "properties": "artemis.conf",
        "queue": "FROM_TEST"
    },
    "consumerType": "artemis",
    "retries": "3",
    "threadCount": 1,
    "timeout": "1000"
}

Файл artemis.conf:

{
    connectors: "localhost:61616"
    connectorConfigs: {
        sslEnabled: "true"
        keyStorePath: "/path/to/cert.jks"
        keyStorePassword: "password"
        trustStorePath: "/path/to/cert.jks"
        trustStorePassword: "password"
    }
    factory: {
        blockOnQueue: false
        loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
        reconnectAttempts: 1
        recoveryInterval: 2000
        confirmationWindowSize: 1
    }
sslFactoryConfigPath: "src/test/resources/artemis-factory.properies"
}

Пример файла mq.properties:

hostName = "localhost"
port = "1414"
queueManager = "QM1"
channel = "DEV.APP.SVRCONN"

Пример файла artemis-factory.properties:

ssl.endpoint.identification.algorithm=""
ssl.keystore.location="tmp/artemis-vault-keystore.jks"
ssl.keystore.mode="600"
ssl.truststore.location="tmp/artemis-vault-truststore.jks"
ssl.truststore.mode="600"
ssl.vault.address="https://vault-host:vault-port"
ssl.vault.auth.role.id="<role_id_for_vault>"
ssl.vault.auth.secret.id="<secret_id_for_vault>"
ssl.vault.auth.type="APPROLE"
ssl.vault.engine.version="1"
ssl.vault.namespace=""
ssl.vault.pki.common.name="<common_name>"
ssl.vault.pki.email="vasya@rambler.ru"
ssl.vault.pki.role.name="default"
ssl.vault.secret.key="key"
ssl.vault.secret.keystore="keystore"
ssl.vault.secret.path="kv1/certstore"
ssl.vault.secret.truststore="truststore"
ssl.vault.tls.enable="true"
ssl.vault.tls.truststore.location="ssl/vault-client.jks"
ssl.vault.tls.truststore.password="<truststore_password>"

Пример конфигурации с валидацией по XSD схеме и логированием тела сообщения#

{
  batchSize: 1000
  timeout: 10000
  retries: 1
  threadCount: 8
  healthCheckPort: 8087
  consumerType: "kafka"
  consumer: {
    topic: "input"
    properties: "conf/consumer.properties"
  }
  producer: {
    topic: "adapter-test-output"
    queryPartitionsInterval: 60000
    properties: "conf/kafka.properties"
  }
  processors: {
    xsd: {
      schemas: ["/path/to/schema/one.xsd", "/path/to/schema/two.xsd"]
    }
    log: {
      format: "xml"
    }
  }
}