Настройки конфигурационного файла adapter.conf#
Все конфигурационные настройки EVTA задаются в файле adapter.conf, который необходимо заполнить до запуска процесса установки EVTA. При заполнении конфигурационного файла необходимо ориентироваться на указанные комментарии.
В таблице представлены параметры настройки, задаваемые в файле adapter.conf. Пример заполненного файла приведен ниже:
Параметр |
Описание |
|---|---|
threadCount |
Количество потоков обработки сообщений, по умолчанию равно количеству ядер в системе |
timeout |
Общий таймаут на блокирующие операции |
batchSize |
Размер сообщений, обрабатываемый за одну итерацию |
retries |
Количество попыток запуска EVTA |
retryInterval |
Интервал между попытками запуска EVTA (мс) |
retryMultiplier |
Множитель каждой следующей попытки |
healthCheckHost |
HTTP host для получения HealthCheck |
healthCheckPort |
HTTP порт для получения HealthCheck, если значение не передано или равно |
consumer |
Параметры получателя |
producer |
Параметры отправителя |
consumerType |
Тип получателя, по умолчанию |
producerType |
Тип отправителя, по умолчанию |
eventDiscoveryEnabled |
Включение Event Discovery, по умолчанию |
ed |
Параметры подключения к Event Discovery |
ed: url |
Адрес подключения к ETCD |
ed: authority |
CN сертификата сервера ETCD |
ed: ssl.key.password |
Пароль от приватного ключа |
ed: ssl.keystore.location |
Путь до хранилища приватного ключа |
ed: ssl.keystore.password |
Пароль от хранилища приватного ключа |
ed: ssl.keystore.type |
Тип хранилища, по умолчанию |
ed: ssl.truststore.location |
Путь до хранилища доверенных сертификатов |
ed: ssl.truststore.password |
Пароль от хранилища доверенных сертификатов |
ed: ssl.truststore.type |
Тип хранилища, по умолчанию |
ed: ssl.protocol |
Версия протокола |
ed: ssl.conscrypt |
Признак использования библиотеки Conscrypt, по умолчанию |
processors |
Настройки дополнительных обработчиков сообщений |
egressEnabled |
Тип используемого egress, поддерживаются 2 типа: |
egressSubscribe/egressPublish |
Настройки egress для подписки и публикации соответственно |
egressSubscribe/egressPublish: isHttp2 |
Значение по умолчанию |
egressSubscribe/egressPublish: host |
Хост, на котором будет запущен сервер |
egressSubscribe/egressPublish: port |
Порт, на котором будет запущен сервер |
egressSubscribe/egressPublish: secret |
Ключ для расшифровки зашифрованных паролей |
egressSubscribe/egressPublish: autoCommit |
Значение по умолчанию |
egressSubscribe/egressPublish: useStorageTime |
Значение по умолчанию |
egressSubscribe/egressPublish: delay |
Значение по умолчанию 60000, периодичность проверки (мс) (используется, если установлен |
egressSubscribe/egressPublish: storageTime |
Значение по умолчанию 30000, максимальное время последнего подключения (мс) (используется, если установлен |
egressSubscribe/egressPublish: needInfoAboutAcknowledge |
Если флаг установлен, то ответы о публикации будут приходить только после подтверждения от транспорта, иначе сразу после отправки |
egressSubscribe/egressPublish: ssl.white.list.path |
Путь до файла со списком разрешенных для подключения к EVTA DN сертификатов (проверка происходит, если параметр указан) |
egressSubscribe/egressPublish: charset |
Кодировка сообщений для вычитки REST, по умолчанию UTF-8. В приоритете — значение из соответствующего заголовка вычитанного сообщения |
egressSubscribe/egressPublish: ssl.key.password |
Пароль от приватного ключа |
egressSubscribe/egressPublish: ssl.keystore.location |
Путь до хранилища приватного ключа |
egressSubscribe/egressPublish: ssl.keystore.password |
Пароль от хранилища приватного ключа |
egressSubscribe/egressPublish: ssl.truststore.password |
Пароль от хранилища доверенных сертификатов |
egressSubscribe/egressPublish: ssl.protocol |
Тип протокола SSL, который будет получен из vault. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.cipher.suites |
Перечисление используемых шифров через запятую, по умолчанию – все доступные |
egressSubscribe/egressPublish: ssl.keystore.type |
Тип хранилища, по умолчанию |
egressSubscribe/egressPublish: ssl.truststore.type |
Тип хранилища, по умолчанию |
egressSubscribe/egressPublish: ssl.endpoint.identification.algorithm |
Алгоритм идентификации endpoint подключения для проверки имени host сервера с использованием сертификата сервера. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.conscrypt |
Использование библиотеки concrypt. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.allowed.dn |
Список разрешенных DN сертификатов |
egressSubscribe/egressPublish: ssl.allowed.dn.file |
Расположение файла со списком разрешенных DN сертификатов |
egressSubscribe/egressPublish: ssl.vault.tls.key.password |
Пароль от приватного ключа хранилища |
egressSubscribe/egressPublish: ssl.vault.tls.keystore.location |
Путь до keystore хранилища |
egressSubscribe/egressPublish: ssl.vault.tls.keystore.password |
Пароль от keystore хранилища |
egressSubscribe/egressPublish: ssl.vault.tls.truststore.location |
Путь до truststore хранилища |
egressSubscribe/egressPublish: ssl.vault.tls.truststore.password |
Пароль от truststore хранилища |
egressSubscribe/egressPublish: ssl.vault.tls.protocol |
Для хранилища: тип протокола SSL, который будет получен из vault. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.tls.keystore.type |
Формат файла keystore хранилища. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.tls.truststore.type |
Формат файла truststore хранилища. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.tls.endpoint.identification.algorithm |
Для хранилища: алгоритм идентификации endpoint подключения для проверки имени сервера с использованием сертификата сервера. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.tls.conscrypt |
Для хранилища: использование библиотеки conscrypt. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn |
Для хранилища: список разрешенных DN сертификатов |
egressSubscribe/egressPublish: ssl.vault.tls.allowed.dn.file |
Для хранилища: расположение файла со списком разрешенных DN сертификатов |
egressSubscribe/egressPublish: ssl.vault.address |
HashiCorp Vault URL |
egressSubscribe/egressPublish: ssl.vault.namespace |
Пространство имен HashiCorp Vault |
egressSubscribe/egressPublish: ssl.vault.auth.type |
Тип аутентификации. Доступные значения:approle, token, password, cert. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.auth.mount |
Путь монтирования аутентификации |
egressSubscribe/egressPublish: ssl.vault.auth.role.id |
Role ID для типа аутентификации approle |
egressSubscribe/egressPublish: ssl.vault.auth.secret.id |
Secret ID для типа аутентификации approle |
egressSubscribe/egressPublish: ssl.vault.auth.token |
Токен для типа аутентификации |
egressSubscribe/egressPublish: ssl.vault.auth.username |
Имя пользователя для аутентификации типа |
egressSubscribe/egressPublish: ssl.vault.auth.password |
Пароль для аутентификации типа |
egressSubscribe/egressPublish: ssl.vault.tls.enable |
Наличие защищенного соединения с HashiCorp Vault. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.namespace |
Vault namespace |
egressSubscribe/egressPublish: ssl.vault.pki.mount |
Путь монтирования Vault PKI API. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.pki.role.name |
Имя роли в PKI Engine |
egressSubscribe/egressPublish: ssl.vault.pki.common.name |
Common name для генерации сертификатов |
egressSubscribe/egressPublish: ssl.vault.pki.crl.enable |
Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.pki.email |
Email для получения сертификата из HashiCorp Vault |
egressSubscribe/egressPublish: ssl.vault.pki.alt.names |
Alternative names для генерации сертификатов |
egressSubscribe/egressPublish: ssl.vault.pki.alt.ip |
Alternative ip для генерации сертификатов |
egressSubscribe/egressPublish: ssl.vault.pki.ttl |
TTL для генерации сертификатов |
egressSubscribe/egressPublish: ssl.vault.pki.csr |
CSR для генерации сертификатов |
egressSubscribe/egressPublish: ssl.vault.pki.trust.only |
Загружать только trust store. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.secret.path |
Секретный путь для ключа, keystore и truststore паролей |
egressSubscribe/egressPublish: ssl.vault.secret.key |
Secret key для пароля ключа |
egressSubscribe/egressPublish: ssl.vault.secret.keystore |
Secret key для keystore пароля |
egressSubscribe/egressPublish: ssl.vault.secret.truststore |
Secret key для truststore пароля |
egressSubscribe/egressPublish: ssl.vault.before.expire |
Время до даты экспирации для обновления сертификатов. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.alias.ca |
Псевдоним для CA сертификта в truststore |
egressSubscribe/egressPublish: ssl.vault.alias.key |
Псевдоним для ключа и сертификата в keystore |
egressSubscribe/egressPublish: ssl.vault.engine.version |
Версия API для key-value машины. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.retries |
Количество повторных попыток запроса. Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.retry.interval |
Интервал между попытками (мс). Значение по умолчанию |
egressSubscribe/egressPublish: ssl.vault.timeout |
Таймаут запроса (с). Значение по умолчанию |
urlConfig |
Путь до файла с настройками endpoint, используется для egress |
Пример заполненного файла adapter.conf
{
"batchSize": "1000",
"egressEnabled":"rest",
"egressPublish": {
"ssl.vault.tls.keystore.location": "path/to/vaultclient.jks"
"ssl.vault.tls.keystore.password":"***",
"ssl.vault.tls.key.password":"***",
"ssl.vault.tls.truststore.location": "path/to/vaultclient.jks"
"ssl.vault.tls.truststore.password":"***",
"ssl.protocol":"TLSv1.2",
"ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
"ssl.vault.auth.type":"CERTIFICATE",
"ssl.keystore.location": "path/to/client.test1.jks",
"ssl.vault.alias.key": "test-1",
"ssl.truststore.location": "path/to/client.test1.jks",
"ssl.vault.address":"https://host:port/",
"ssl.vault.namespace":"",
"ssl.vault.pki.common.name":"KafkaClient1",
"ssl.vault.pki.email":"example@mail.com",
"ssl.vault.pki.alt.names":"localhost",
"ssl.vault.pki.alt.ip":"ip",
"ssl.vault.secret.path":"path/to/certstore",
"ssl.vault.secret.key":"key",
"ssl.vault.secret.keystore":"keystore",
"ssl.vault.secret.truststore":"truststore",
"ssl.vault.engine.version":"1",
"ssl.vault.pki.role.name":"test1",
"ssl.vault.pki.crl.enable": "true",
"host": "localhost",
"port": 9002,
"ssl.white.list.path":"path/to/allowed-dn.txt",
"needInfoAboutAcknowledge": true,
"isHttp2":false
}
"egressSubscribe": {
"ssl.vault.tls.keystore.location": "path/to/certs/vaultclient.jks"
"ssl.vault.tls.keystore.password":"***",
"ssl.vault.tls.key.password":"***",
"ssl.vault.tls.truststore.location": "path/to/certs/vaultclient.jks"
"ssl.vault.tls.truststore.password":"***",
"ssl.protocol":"TLSv1.2",
"ssl.engine.factory.class":"ru.sbt.ss.kafka.VaultSslEngineFactory",
"ssl.vault.auth.type":"CERTIFICATE",
"ssl.keystore.location": "path/to/certs/client.test1.jks",
"ssl.vault.alias.key": "test-1",
"ssl.truststore.location": "path/to/certs/client.test1.jks",
"ssl.vault.address":"https://host:port/",
"ssl.vault.namespace":"",
"ssl.vault.pki.common.name":"KafkaClient1",
"ssl.vault.pki.email":"example@mail.com",
"ssl.vault.pki.alt.names":"localhost",
"ssl.vault.pki.alt.ip":"ip",
"ssl.vault.secret.path":"path/to/certstore",
"ssl.vault.secret.key":"key",
"ssl.vault.secret.keystore":"keystore",
"ssl.vault.secret.truststore":"truststore",
"ssl.vault.engine.version":"1",
"ssl.vault.pki.role.name":"test1",
"ssl.vault.pki.crl.enable": "true",
"host": "localhost",
"port": 9003,
"ssl.white.list.path":"path/to/allowed-dn.txt",
"charset": "UTF-8",
"isHttp2":false
}
"ed": {
"url": "host:port",
"authority": "DN cert",
"ssl.keystore.location":"path/to/grpc-adapter.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"path/to/grpc-adapter.jks",
"ssl.truststore.password":"***",
"ssl.protocol":"TLSv1.2",
"ssl.conscrypt": false
},
"urlConfigPath":"path/to/urlConfig.json",
"eventDiscoveryEnabled": false,
"healthCheckPort": "port",
"brokerWhiteListPath": "path/to/allowed-dn2.txt",
"processors": {
"log": {
"format": "auto",
"level": "info"
}
},
"producer": {
"properties": "path/to/prop.properties",
"topic": "testTopic"
},
"producerType": "kafka",
"retries": "3",
"threadCount": 1,
"timeout": "1000"
}
Поддерживаемые типы транспорта для параметров Сonsumer и Producer:#
kafka — Platform V Corax/Apache Kafka;
artemis — брокер Artemis MQ;
ibm_mq — брокер IBM MQ;
rabbit — брокер RabbitMQ.
Функционал, связанный с IBM MQ, не дорабатывается.
В дальнейшем планируется добавление других типов транспорта.
Параметры получателя и отправителя состоят из 2-х частей:
Параметры, специфичные для EVTA, задаются в том же объекте Consumer или Producer.
Параметры, специфичные для библиотеки транспорта, передаются в отдельном файле через параметр properties объектов Consumer или Producer.
Возможные варианты настроек Consumer и Producer указаны в таблице:
Тип брокера |
Platform V Corax/Apache Kafka |
artemis |
ibm_mq |
rabbit |
|---|---|---|---|---|
Consumer |
topic — имя входящего topic |
queue — имя входящей очереди |
queue — имя входящей очереди |
properties — путь до файла с параметрами, содержащего: |
Producer |
topic — имя исходящего topic |
queue — имя входящей очереди, |
queue — имя входящей очереди |
properties — путь до файла с параметрами, содержащего: |
Пример конфигурации kafka-kafka#
В данном случае EVTA используется как перекладчик из одного topic EVTD в другой.
Файл adapter.conf:
{
...
batchSize: 1000
timeout: 10000
threadCount: 2
healthCheckPort: 8081
consumer: {
topic: "input"
properties: "conf/consumer.properties"
}
producer: {
topic: "output"
bufferSize: 1000
queryPartitionsInterval: 5000
properties: "conf/producer.properties"
}
...
}
Файл consumer.properties:
bootstrap.servers = "host.docker.internal:9092"
client.id ="consumer.id"
group.id = "consumer.group.id"
Файл producer.properties:
bootstrap.servers = "host.docker.internal:9092"
client.id = "producer.id"
group.id = "producer.group.id"
Пример конфигурации mq-kafka#
В данном случае EVTA используется как шлюз между брокером очередей IBM MQ и брокером EVTD.
Файл adapter.conf:
{
...
batchSize: 1000
timeout: 10000
retries: 1
threadCount: 8
healthCheckPort: 8087
consumerType: "ibm_mq"
consumer: {
queue: "TEST"
protocol: "TLSv1"
conscrypt: false
keyStorePath: "***"
keyStorePassword: "*******"
trustStorePath: "***"
trustStorePassword: "*******"
properties: "conf/mq.properties"
}
producer: {
topic: "adapter-test-output"
queryPartitionsInterval: 60000
properties: "conf/kafka.properties"
}
...
}
Файл mq.properties:
hostName = localhost
port = 1420
queueManager = PMS.PPRB.IM1
channel = CEP.SVRCONN
SSLCipherSuite = TLS_RSA_WITH_AES_128_CBC_SHA
Файл kafka.properties:
bootstrap.servers = "host:port"
group.id = vault-test
client.id = vault-client
security.protocol = SSL
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = /path/to/client.test.jks
ssl.truststore.location = /path/to/client.test.jks
ssl.protocol = TLSv1.2
ssl.engine.factory.class = ru.sbt.ss.kafka.VaultSslEngineFactory
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = APPROLE
ssl.vault.auth.role.id = <role id>
ssl.vault.auth.secret.id = <secret id>
ssl.vault.pki.role.name = <role name>
ssl.vault.pki.common.name = <common name>
ssl.vault.secret.path = <path to secret>
ssl.vault.secret.key = <key>
ssl.vault.secret.keystore = <keystore>
ssl.vault.secret.truststore = <truststore>
ssl.vault.engine.version = 1
ssl.vault.tls.truststore.location = /path/to/vaultclient.jks
ssl.vault.tls.truststore.password = <password>
Пример конфигурации kafka-artemis#
В данном случае EVTA используется как шлюз между брокером очередей SMBX и брокером EVTD.
Файл adapter.conf:
{...
batchSize: 1000
timeout: 10000
threadCount: 2
healthCheckPort: 8087
producerType: artemis
consumer: {
topic: "adapter-test-output"
queryPartitionsInterval: 60000
properties: "conf/kafka.properties"
}
producer: {
queue: "TEST"
properties: "conf/artemis.conf"
}
...
}
Файл artemis.conf:
{
connectors: "localhost:61616, 127.0.0.1:61617"
connectorConfigs: {
sslEnabled: "true"
keyStorePath: "***"
keyStorePassword: "*******"
trustStorePath: "***"
trustStorePassword: "*******"
}
factory: {
blockOnQueue: false
loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
reconnectAttempts: 1
recoveryInterval: 2000
}
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}
Файл kafka.properties:
bootstrap.servers = "host:port"
group.id = vault-test
client.id = vault-client
security.protocol = SSL
ssl.endpoint.identification.algorithm = ""
ssl.keystore.location = /path/to/client.test.jks
ssl.truststore.location = /path/to/client.test.jks
ssl.protocol = TLSv1.2
ssl.engine.factory.class = ru.sbt.ss.kafka.VaultSslEngineFactory
ssl.vault.address = "https://host:port/"
ssl.vault.auth.type = APPROLE
ssl.vault.auth.role.id = <role id>
ssl.vault.auth.secret.id = <secret id>
ssl.vault.pki.role.name = <role name>
ssl.vault.pki.common.name = <common name>
ssl.vault.secret.path = <path to secret>
ssl.vault.secret.key = <key>
ssl.vault.secret.keystore = <keystore>
ssl.vault.secret.truststore = <truststore>
ssl.vault.engine.version = 1
ssl.vault.tls.truststore.location = /path/to/vaultclient.jks
ssl.vault.tls.truststore.password = <password>
Пример конфигурации artemis-artemis#
В данном случае EVTA используется как шлюз между брокером очередей SMBX.
Файл adapter.conf:
{
"batchSize": "1000",
"egressEnabled":"none",
"eventDiscoveryEnabled": false,
"processors": {
"log": {
"format": "auto",
"level": "info"
}
},
"producer": {
"properties": "artemis.conf",
"queue": "TO_TEST"
},
"producerType": "artemis",
"consumer": {
"properties": "artemis.conf",
"queue": "FROM_TEST"
},
"consumerType": "artemis",
"retries": "3",
"threadCount": 1,
"timeout": "1000"
}
Файл artemis.conf:
{
connectors: "localhost:61616"
connectorConfigs: {
sslEnabled: "true"
keyStorePath: "/path/to/cert.jks"
keyStorePassword: "password"
trustStorePath: "/path/to/cert.jks"
trustStorePassword: "password"
}
factory: {
blockOnQueue: false
loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
reconnectAttempts: 1
recoveryInterval: 2000
confirmationWindowSize: 1
}
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}
Пример конфигурации mq-mq#
В данном случае EVTA используется как шлюз между брокерами очередей IBM MQ.
Файл adapter.conf:
{
"batchSize": "1000",
"egressEnabled":"none",
"eventDiscoveryEnabled": false,
"processors": {
"log": {
"format": "auto",
"level": "info"
}
},
"producer": {
"properties": "mq.properties",
"queue": "TO_TEST"
},
"producerType": "ibm_mq",
"consumer": {
"properties": "mq.properties",
"queue": "FROM_TEST"
},
"consumerType": "ibm_mq",
"retries": "3",
"threadCount": 1,
"timeout": "1000"
}
Пример файла mq.properties:
hostName = localhost
port = 1414
queueManager = QM1
channel = DEV.APP.SVRCONN
Пример конфигурации artemis-mq#
В данном случае EVTA используется как шлюз между брокерами очередей SMBX и IBM MQ.
Файл adapter.conf:
{
"batchSize": "1000",
"egressEnabled":"none",
"eventDiscoveryEnabled": false,
"processors": {
"log": {
"format": "auto",
"level": "info"
}
},
"producer": {
"properties": "mq.properties",
"queue": "TO_TEST"
},
"producerType": "ibm_mq",
"consumer": {
"properties": "artemis.conf",
"queue": "FROM_TEST"
},
"consumerType": "artemis",
"retries": "3",
"threadCount": 1,
"timeout": "1000"
}
Файл artemis.conf:
{
connectors: "localhost:61616"
connectorConfigs: {
sslEnabled: "true"
keyStorePath: "/path/to/cert.jks"
keyStorePassword: "password"
trustStorePath: "/path/to/cert.jks"
trustStorePassword: "password"
}
factory: {
blockOnQueue: false
loadBalancingPolicyClassName: org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
reconnectAttempts: 1
recoveryInterval: 2000
confirmationWindowSize: 1
}
sslFactoryConfigPath: "src/test/resources/vault_config.conf"
}
Пример файла mq.properties:
hostName = localhost
port = 1414
queueManager = QM1
channel = DEV.APP.SVRCONN
Валидация сообщений и настройка логирования#
Обработка сообщения является промежуточным этапом передачи между получением сообщения из источника и отправкой в точку назначения. Задать шаги обработки можно задать через блок параметров processors в конфигурационном файле adapter.conf.
Каждый этап обработки сообщения не зависит от наличия других шагов и является атомарным процессом. Шаги обработки передаются в этом параметре в формате ключ-значение, где ключ — идентификатор шага обработки, значение — параметры шага.
EVTA предоставляет встроенные шаги обработки сообщений:
xsd — проверка тела сообщения по XSD схеме, параметры:
schemas — массив/список путей до XSD схем;
encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;
jsonSchema — проверка тела сообщения по JSON схеме, параметры:
schemas — массив/список путей до JSON схем;
encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;
avro проверка тела сообщения по AVRO схеме, параметры:
schemas — массив/список путей до avro схем;
encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8
log — логирование тела сообщения в лог EVTA, параметры:
format — формат тела сообщения, поддерживаемые значения xml, json, либо auto для автоопределения формата, по умолчанию auto;
level — уровень логирования сообщения, по умолчанию info;
encoding — кодировка тела сообщения и файла схемы, по умолчанию UTF-8;
logOnlyHeaders — если имеет значение
true, то логируются только заголовки сообщений. Значение по умолчаниюfalse;
textConversion — перекодировка тела сообщения из одной кодировки в другую:
input — кодировка входящего сообщения;
output — кодировка исходящего сообщения;
header — фильтрация заголовков сообщения:
enabled — если имеет значение
true, то фильтрация заголовков сообщения включена. Значение по умолчаниюtrue;remove — удаление всех заголовков сообщения. Значение по умолчанию
false;include — массив/список заголовков для включения. Значение по умолчанию пустой список;
exclude — массив/список заголовков для исключения. Значение по умолчанию пустой список;
keyFilter — фильтрация ключа сообщения:
enabled — если имеет значение
true, то фильтрация ключа сообщения включена. Значение по умолчаниюfalse.
Шаги xsd, jsonSchema, avro выполняются успешно при первой успешной проверке тела сообщения по схеме. Если сообщение не соответствует ни одной схеме, то возникает ошибка обработки. В случае успеха в результате обработки возвращается оригинальное сообщение без изменений.
Шаг log всегда работает без ошибок и не влияет на обработку сообщений, любое перехватываемое исключение записывается в логах EVTA без прерывания дальнейшей обработки.
В шаге header при задании нескольких параметров выполняется только параметр с наивысшим приоритетом (приоритеты в порядке уменьшения — enabled, remove, include, exclude).
Пример конфигурации с валидацией по XSD схеме и логированием тела сообщения#
{
batchSize: 1000
timeout: 10000
retries: 1
threadCount: 8
healthCheckPort: 8087
consumerType: "kafka"
consumer: {
topic: "input"
properties: "conf/consumer.properties"
}
producer: {
topic: "adapter-test-output"
queryPartitionsInterval: 60000
properties: "conf/kafka.properties"
}
processors: {
xsd: {
schemas: ["/path/to/schema/one.xsd", "/path/to/schema/two.xsd"]
}
log: {
format: "xml"
}
}
}