Руководство прикладного разработчика#
Термины и определения#
Термин/аббревиатура |
Определение |
|---|---|
GET |
Метод запроса, поддерживаемый HTTP-протоколом, для получения содержимого указанного ресурса |
gRPC |
Высокопроизводительный фреймворк, разработанный компанией Google для вызова удаленных процедур (RPC) |
IP-адрес |
Internet Protocol Address, уникальный числовой идентификатор устройства в компьютерной сети |
JKS |
Java Key Store, файловый формат для хранения конфиденциальных данных |
POST |
Метод запроса, поддерживаемый HTTP-протоколом, при котором веб-сервер принимает данные, заключенные в тело сообщения, для хранения |
PUT |
Метод запроса, поддерживаемый HTTP-протоколом, для загрузки содержимого на указанный ресурс |
REST |
Representational State Transfer, архитектурный стиль взаимодействия компонентов распределенного приложения |
SpEL |
Spring Expression Language, язык выражений Spring, который обеспечивает вычисления во время выполнения |
SSL |
Secure Sockets Layer, криптографический протокол, предназначенный для защиты обмена данными в сети |
TLS |
Transport Layer Security, криптографический протокол защиты транспортного уровня, предназначенный для защиты обмена данными в сети |
URL |
Uniform Resource Locator, унифицированный адрес ресурса |
АС |
Автоматизированная система |
ПСИ |
Приемо-сдаточные испытания |
Istio |
Компонент, входящий в состав продукта «Platform V Synapse Service Mesh» инфраструктурного уровня для облегчения связи между сервисами с использованием прокси |
ЦА |
Центр сертификации или удостоверяющий центр (от англ. Certification authority, CA) — сторона, чья честность неоспорима, а открытый ключ широко известен. Задача центра сертификации — подтверждать подлинность ключей шифрования с помощью сертификатов электронной подписи |
Системные требования#
Для установки программного компонента Kafka Gateway (далее — Шлюза Kafka) в составе прикладного дистрибутива должно быть выполнено следующее:
Развернут и настроен кластер Kubernetes 1.17+ или Red Hat OpenShift 4.5.13+ в соответствии с требованиями, предъявляемыми к Платформе.
В кластере создан проект (namespace), в котором будет разворачиваться шлюз.
В проекте создана учетная запись с правами на загрузку артефактов.
Docker-образ шлюза размещен в целевом Docker-репозитории по ссылке, указанной в конфигурационном артефакте Deployment.
В проект добавлен Secret для загрузки Docker-образов из целевого Docker-репозитория.
Подготовлен комплект настроенных конфигурационных артефактов.
Брокеры Kafka настроены и готовы к работе, есть доступ к ним со стороны кластера Kubernetes или Red Hat OpenShift.
Созданы целевые topic(s) сообщений, необходимый доступ к ним предоставлен.
В проекте имеются свободные ресурсы по лимитам и реквестам не менее, чем зарезервировано в конфигурационных артефактах.
При установке с использованием консоли, на рабочем месте должен быть установлен клиент Kubernetes или Red Hat OpenShift.
Минимальные ресурсы, необходимые для запуска Шлюза Kafka
Контейнер |
CPU Request |
Memory Request |
CPU Limit |
Memory Limit |
|---|---|---|---|---|
Приложение |
400m |
512Mi |
400m |
512Mi |
При опциональном использовании совместно со Шлюзом Kafka Сайдкара компонента Сервисный прокси (входящего в состав продукта «Platform V Synapse Service Mesh») минимальные ресурсы, необходимые для запуска, следует уточнить в соответствующей документации используемого компонента.
Общие сведения#
Компонент Шлюз Kafka предназначен для организации взаимодействия компонентов Synapse с внешними АС с помощью topic(s) Kafka по протоколу GRPC или REST.
Подключение к брокерам Kafka осуществляется без egress-gateway Istio.
Подключение и конфигурирование#
Для установки экземпляра Шлюза Kafka на стенд, необходимо в соответствующий проект загрузить конфигурационные артефакты:
Артефакт |
Содержание |
Описание |
|---|---|---|
Deployment |
Параметры запуска контейнера приложения в Kubernetes или Red Hat OpenShift |
Наименование экземпляра приложения, ссылка на образ контейнера приложения, запрашиваемые ресурсы, публикуемые порты, параметры liveness и readiness проб, необходимость и параметры подключения sidecar контейнеров, точки монтирования конфигурационных артефактов в файловую систему контейнера. |
Config Map |
application.yml |
Файл содержащий параметры конфигурации приложения |
Secret |
ssl.yml |
Файл содержащий параметры настройки ssl. Содержит конфиденциальные данные (пароли к приватным ключам), поэтому загружается в виде секрета |
Secret |
Ключи и сертификаты |
Файлы .jks содержащие ключи и сертификаты для подключения к брокерам Kafka по ssl. Содержит конфиденциальные данные поэтому загружается в виде секрета |
Service |
Артефакт для регистрации приложения в service discovery Kubernetes или Red Hat OpenShift |
Селекторы и порты для подключения приложения к механизмам распределения трафика Kubernetes или Red Hat OpenShift |
Service Entry |
Артефакт для регистрации внешнего сервиса в Kubernetes или Red Hat OpenShift |
Содержит адреса хостов и номера портов для подключения к брокерам Kafka. Если для нужных хостов и портов Service Entry уже установлены в проекте, то грузить его повторно не требуется |
Последовательность загрузки артефактов:
Config Map c application.yml
Secret c ssl.yml
Secret c ключами
Service Entry
Deployment
Service
Порядок действий при загрузке артефактов в проект описаны в Руководстве по установке .
В случае использования SecMan, все конфигурации с типом Secret должны храниться в SecMan.
Карточка адаптера#
Параметр |
Значение |
|---|---|
Тип соединения |
Адаптер может работать в двух направлениях: |
Протоколы соединений |
REST, gRPC, Kafka |
Обязательные заголовки входящего запроса (по GRPC/REST) |
· x-synapse-operationname - имя операции |
Дополнительные заголовки входящего запроса (по GRPC/REST) |
· x-synapse-kafka-topic - Имя topic(s), в который будет отправлено сообщение. |
Обязательные заголовки исходящего запроса (по Kafka) |
Для GRPC: |
Дополнительные заголовки исходящего запроса (по Kafka) |
Для GRPC: |
Описание трансформации адаптера |
Заголовки |
Подпись сообщения/проверка подписи с помощью сертификата/OTT |
При настроенном SSL подключении к Kafka и заданных параметрах: spring.kafka.producer.properties.interceptor.classes, spring.kafka.producer.properties.interceptor.signature.certificate.mode Шлюз АС Kafka будет добавлять цифровую подпись и дополнительную информацию в заголовки сообщения, используя указанный в настройках сертификат. |
Полное описание конфигурации (/deployments/config/application.yml)
logging.file.level: <OFF/INFO/DEBUG, off - логирование в файл отключено, info - в файл логируется информация без тела сообщения, debug - в файл логируется вся информация, включая тело сообщения>
logPath: <Путь до файла с логом, по умолчанию /opt/synapse/logs/messages>
grpc:
server:
server-port: <Порт, на котором будет поднят GRPC сервер>
client:
settings:
default:
hostname: <Имя сервиса в Kubernetes или Red Hat OpenShift, которое должен вызвать адаптер после вычитки сообщения из Kafka>
port: <Порт сервиса в Kubernetes или Red Hat OpenShift>
signature-verification-failed-url: # вызов API при ошибке проверки подписи сообщения
hostname: <Имя сервиса в Kubernetes или Red Hat OpenShift, которое должен вызвать адаптер после вычитки сообщения из Kafka>
port: <Порт сервиса в Kubernetes или Red Hat OpenShift>
server:
port: <Порт, на котором будет поднят Spring Tomcat, используется для проверки жизнеспособности приложения>
tomcat:
max-swallow-size: <Максимальный размер сообщения, которые может быть принять по REST API, например 40MB>
threads:
max: <Максимальное кол-во потоков для обработки REST вызовов>
min-spare: <Минимальное кол-во потоков для обработки REST вызовов, которое всегда будет доступно>
logging:
level:
org:
springframework: OFF/ERROR/INFO/DEBUG # уровень логирования для springframework
root: OFF/ERROR/INFO/DEBUG # уровень логирования для root логгера
com.sbt.synapse.kafka.service.api.BulkController: OFF/ERROR/INFO/DEBUG # уровень логирования для контроллера обработки batch сообщений
com.sbt.synapse.kafka.service.KafkaService: OFF/ERROR/INFO/DEBUG # уровень логирования для модуля отправки сообщений в Kafka
com.sbt.synapse.kafka.integration.utils.BulkUtils: OFF/ERROR/INFO/DEBUG # уровень логирования для обработчика batch сообщений
spring:
main:
banner-mode: off/console/log # управление вывода в журнал баннера spring
profiles: production # используемый профиль приложения spring
kafka:
rest:
mode: true/false # настройка адаптера для работы по REST
timeout: 30000 # таймаут в мс для REST-вызова
max-connections: 100 # размер пула коннектов для REST-вызовов (при чтении из topic(s))
default-url: "http://localhost:1414/test" # URL, который будет вызван по умолчанию
signature-verification-failed-url: "http://localhost:1414/fail" # URL, который будет вызван при ошибке проверки подписи сообщения
default-method: get/post # метод, который будет использован
default-content-type: "application/json" # значение заголовка Content Type по умолчанию
controller: /receive # путь, по которому будет доступен вызов контроллера API для записи в topic(s), по умолчанию "/". >
ssl:
key-store-location: <Путь до keystore, оставить пустым, если TLS не требуется>
trust-store-location: <Путь до truststore, оставить пустым, если TLS не требуется>
protocol: <Версия протокола SSL подключения, оставить пустым, если TLS не требуется>
producer: <Опциональный блок, если не задан, то сообщения в topic(s) не отправляются>
topics: # список топиков и отдельные настройки kafka producer для него
- topic: <имя топика>
properties: <Список настроек kafka producer, ключ значение>
simpleKafkaHeaderMapper: <true/false использовать simpleKafkaHeaderMapper для исключения заголовка spring_json_header_types, по умолчанию false>
sensitive-headers: <Перечисление через запятую заголовков, регистр которых важен при отправке kafka-сообщения. Передаваемые заголовки сравниваются по equalsIgnoreCase, и при совпадении приводятся к указанному виду.>
batch:
errorStatus: <Статус код ошибки, которая будет возвращаться клиенту при вызове bulk_api, по-умолчанию 200>
separator: <Разделитель, с помощью которого будут разделены пришедшие сообщения, по умолчанию \n>
licenseResponsePath: <Путь до файла с ответом при проверке доступности шлюза сервисом filebeat>
skipList: <Список для фильтрации сообщений. Если сообщение содержит хотя бы одно вхождение из списка "skipList", то оно пропускается>
- <string, например "\"_index\"">
bootstrap-servers: <Server:Port брокера Kafka, в который требуется помещать сообщения>
topic-default: <Имя topic, в который требуется помещать сообщения по умолчанию>
key-serializer: org.apache.kafka.common.serialization.StringSerializer # сериализатор для ключей сообщений в Kafka
value-serializer: org.apache.kafka.common.serialization.StringSerializer # сериализатор для сообщений в Kafka
properties: # настройки для топика по умолчанию
# Подключить интерсептор (Producer)
interceptor.classes: ru.sbt.ss.kafka.interceptors.KafkaSignatureProducerInterceptor
interceptor.signature.certificate.mode: <public-key или certificate-serial, public-key - использование публичного ключа для проверки подписи на стороне потребителя (то есть в заголовках передается ключ), certificate-serial - использование серийного номера сертификата для проверки подписи (то есть в заголовках передается серийный номер)>
# Все настройки ниже опциональны:
# Настроить алгоритм подписи, поддерживается только алгоритм SHA256-RSA
interceptor.signature.certificate.algorithm: SHA256WithRSA
# Настроить хранилище сертификатов
# Интерсептор может использовать отдельное хранилище сертификатов:
interceptor.signature.certificate.ssl.keystore.location = ssl/keystore.jks
# Выбрать сертификат из хранилища, указав его alias (по умолчанию используется первый)
interceptor.signature.certificate.key.alias =
# Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
interceptor.signature.header = signature
# Префикс для имен системных заголовков
interceptor.signature.attributes.prefix = signature.
# Имя загловка с используемым алгоритмом подписи
interceptor.signature.certificate.algorithm.header=algorithm
# Имя заголовка с публичным ключом для проверки подписи
interceptor.signature.certificate.public.key.header=public.key
consumer: <Опциональный блок, если не задан, то сообщения из topic(s) не вычитываются>
threads: <Кол-во потоков consumer>
max-poll-records: <Максимальное кол-во сообщений, которое будет прочитано в batch режиме, обязательный параметр для режима batch>
transactional: <true/false - если true, то в случае ошибки доставки сообщения фиксировать ли offset topic(s) Kafka (в этом случае адаптер будет бесконечно пробовать читать недоставленные сообщения и отправлять по grpc/rest)>
repeat-after-error-ms: <в случае transactional=true, период повторной попытки вычитки и отправки сообщения при ошибке>
bootstrap-servers: <Server:Port брокера Kafka, из которого требуется получать сообщения>
topics: <Список topic(s), по которым необходимо выполнять чтение>
- topic: <Имя topic(s)>
group-id: <Имя consumer группы>
enabled: <true/false - включен ли слушатель по данному topic(s), можно изменять в runtime>
ratelimiter: <Настройки лимиттера, если включен, то подключается ограничитель по полученным сообщениям в секунду tpsPerSecond (если не задан tpsPerSecond, то используется tpsPerMinute) и шлюз не читает больше данного порога>
tpsPerMinute: <Максимальное кол-во сообщений в минуту, которое может прочитать шлюз>
tpsPerSecond: <Максимальное кол-во сообщений в секунду, которое может прочитать шлюз (приортетнее чем tpsPerMinute, если заданы оба значения)>
enabled: <true/false - включен ли лимиттер по данному topic(s)>
properties: <Список настроек консьюмера, ключ значение>
# Подключить интерсептор (Consumer)
interceptor.classes: ru.sbt.ss.kafka.interceptors.KafkaSignatureConsumerInterceptor,com.sbt.synapse.kafka.service.SignatureValidateConsumerInterceptor
interceptor.signature.service.class: # класс, который будет использоваться для подписи/проверки подписи, н-р: ru.sbt.ss.kafka.ott.OttSignatureService
# Все настройки ниже опциональны:
# Настроить режим работы консюмера при ошибках* (по умолчанию failOnValue и только в этом режиме поддерживается передача сообщения дальше на API)
interceptor.signature.mode: #failOnConsume|filter|failOnValue
# Настроить удаление системных заголовков из сообщения
interceptor.signature.remove.headers = false
# Настроить имена системных заголовков
# Имя заголовка с подписью сообщения
interceptor.signature.header = signature
# Префикс для имен системных заголовков
interceptor.signature.attributes.prefix = signature.
# Имя загловка с используемым алгоритмом подписи
interceptor.signature.certificate.algorithm.header=algorithm
# Имя заголовка с публичным ключом для проверки подписи
interceptor.signature.certificate.public.key.header=public.key
batching: <true/false - Режим чтения и отправки сообщений пачками>
batchingOneMessage: <true/false - Режим работы, когда сообщения из топика Kafka читаются пачкам, но передаются дальше по одному (вызов REST/GRPC осуществляется для каждого сообщения из пачки)>
filter: # Не поддерживается для батчей
rules: # <Имя topic(s)>:<условие фильтра в SPEL, если удовлетворяет условию фильтра, то сообщение отправляется, иначе - нет>
'SrvDistributeMergePrivateClient.Request': "#headers.get('x-synapse-type') == 'CASHIN' && #headers.get('HW_ERROR') == 'true'"
'SrvDistributeMergePrivateClient.Response': '#HW_ERROR == "true"'
properties: # Список общих настроек, которые будут определены как для kafka producer, так и для kafka consumer
security.protocol: <Протокол подключения, исключить параметр, если TLS не требуется>
ssl.endpoint.identification.algorithm: "" # отключение проверки серверного сертификата
ssl.enabled.protocols: TLSv1.2 # список протоколов разрешенных к использованию при создании подключения
transform-xml-json:
services:
srvupdateanddedubclient: #название сервиса из x-synapse-operationname
transform: true #<Нужно ли трансформировать выполнять трансформацию JSON-XML> # не является целевым вариантом использования шлюза, трансформацию необходимо выполнять на сервисе трансформации
unTypeFieldsXmlToJson:
SPName: spName
unTypeFieldsJsonToXml:
personUpdateStatuses: PersonUpdateStatus
encodeBodyToBase64: false # Кодировать ли тело сообщения, сразу после получения из Kafka в base64. Работает только при получении сообщений из Kafka в одиночном режиме. Может быть полезно для работы с бинарными сообщениями. Другие трансфорамации не будут работать. По умолчанию false.
topics-routing: # правила, по которым необходимо маршрутизировать сообщения вычитанные из topic(s) в формате <имя topic(s)>: <маршрут>
routes:
<topic_name>: service1
<topic_name2>: service2
routing:
# правила для parsing(s) сообщения входящего по grpc/rest
api:
# правило определения маршрута
routeName:
valueFrom:
# значение маршрута будет взято из тела сообщения по путь $.header.evtType или из значение заголовока x-synapse-operationname
# приоритет отдается первому определенному
- type: fromBody
value: $.header.evtType
- type: fromHeader
value: x-synapse-operationname
# список маршрутов по routeName, полученному в предыдущем блоке routeName.valueFrom
# если не получилось найти соответствие routeName по маршрутам, то будет использован маршрут default
routeList:
- routeName: process
# SPEL выражение, куда отправлять сообщение
destinationExpression: ('sytester_rq_process')
variables:
# список переменных, на основе которых можно построить destinationExpression и заполнить x-synapse-rquid/x-synapse-operationname
# переменные из данного блока будут добавлены в заголовки сообщения
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- type: fromHeader
value: x-synapse-operationname
- type: fromConst
value: const
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
- routeName: cancel
destinationExpression: ('pprbacq-' + variables['OperationName'] + '-rq')
variables:
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
- routeName: default
destinationExpression: ('sytester_rq')
variables:
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
#правила для parsing(s) сообщения вычитанного из Kafka-topic(s), routing сообщений требует передачи заголовка топика (includeHeaders.kafka: kafka_receivedTopic), если исключаются все заголовки сообщения при чтении из топика (excludeHeaders.kafka: all)
kafka:
routeName:
valueFrom:
- type: fromBody
value: $.header.evtType
routeList:
- routeName: AepPublisher
destinationExpression: ('http://sytester-http-stub:8237/AepPublisher')
variables:
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
- name: Header
valueFrom:
- type: fromHeader
value: test-header
- name: Const
valueFrom:
- type: fromConst
value: test-const
- routeName: AepPublisher2
destinationExpression: ('http://sytester-http-stub:8237/AepPublisher2')
variables:
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
- name: Header
valueFrom:
- type: fromHeader
value: test-header
- name: Const
valueFrom:
- type: fromConst
value: test-const
- routeName: default
destinationExpression: ('http://sytester-http-stub:8237/default')
variables:
- name: OperationName
valueFrom:
- type: fromBody
value: $.header.evtType
- name: RqUID
valueFrom:
- type: fromBody
value: $.header.evtId
- name: Header
valueFrom:
- type: fromHeader
value: test-header
- name: Const
valueFrom:
- type: fromConst
value: test-const
audit:
enabled: true
service:
host: <адрес сервера аудита>
metamodel-route: <endpoint для регистрации метамодели аудита>
event-route: <endpoint для фиксации событий аудита>
metamodel:
version: <версия метамодели>
module-id: KFGT # идентификатор модуля
retry:
max-attempts: 5 # кол-во попыток регистрации метамодели
wait-duration: 3000 # время в миллисекундах между попытками регистрации метамодели
event:
retry:
max-attempts: 5 # кол-во попыток фиксации событий аудита
wait-duration: 3000 # время в миллисекундах между попытками фиксации
buffer:
size: 1000 # размер буфера событий, которые необходимо отправить в аудит
listener:
pool-size: 1 # кол-во слушателей, которые читают буфер и отправляют события в аудит
period: 2000 # периодичность в миллисекундах, как часто слушатели должны опрашивать буфер
tracing:
addPrefixForHeaders: <true/false - Нужно ли добавлять префикс x-synapse к HTTP/GRPC заголовкам, которые не имеют префикса x-, по умолчанию true>
generateXB3Headers: <true/false - Нужно ли генерировать XB3 заголовки>
generate128bitTraceId: <true/false - Формировать в 128bit виде>
tracingHeaders: # tracingHeaders - является неактуальным блоком для добавления заголовков, необходимо использовать блок variables в блоке routing для добавления заголовков
<Название заголовка, который необходимо добавить в GRPC/REST заголовок>: {type: <Правило получение значения (fromBody, fromConst)>, value: <Значение>}
excludeHeaders:
api:
<Названия заголовков, которые необходимо пропустить, или all для исключения всех заголовков (за исключением заголовков из includeHeaders.api) при вызове адаптера по HTTP/GRPC>
kafka:
<Названия заголовков, которые необходимо пропустить, или all для исключения всех заголовков (за исключением заголовков из includeHeaders.kafka) при чтении из Kafka-topic(s) (фильтрация сообщений не будет доступна, если указан режим all и includeHeaders.kafka не содержит ни одного правила)>
includeHeaders:
api: <работает при excludeHeaders.api: - all>
<Названия заголовков, которые необходимо оставить при вызове адаптера по HTTP/GRPC>
kafka: <работает при excludeHeaders.kafka: - all>
<Названия заголовков, которые необходимо оставить при чтении из Kafka-topic(s)>
encodeHeaders:
api:
<Названия заголовков, значение которых необходимо закодировать в base64 при вызове адаптера по HTTP/GRPC>
kafka:
<Названия заголовков, значение которых необходимо закодировать в base64 при чтении из Kafka-topic(s)>
Поведение в случае ошибок с signature при выборе режимов в interceptor.signature.mode: failOnValue
Если у сообщения отсутствуют signature или они невалидные, то в заголовок сообщения добавляются signature-error-topic, signature-error-partition, signature-error-offset, а вместо сообщения пишется строка с сообщением об ошибке отсутствия/невалидности подписи
Пример сообщения при отсутствии signature: {"status": "There are no signature headers in the message","topic": "consumer_test","partition": "0","offset": "769"}
Пример сообщения при невалидности signature: {"status": "The message failed signature verification","topic": "consumer_test","partition": "0","offset": "769"}
Пример добавленных заголовков: [signature-error-topic=[consumer_test], signature-error-offset=[769], signature-error-partition=[0]];
filter
В логах пишется ошибка, сообщение не отправляется
Пример ошибки без signature:
[ERROR] [kafkaListener0-0-C-1] [r.s.s.k.i.KafkaSignatureConsumerInterceptor] [T:] - Skipping record: Error while processing record(topic: consumer_test, partition: 0, offset: 682): Couldn't find attribute with key 'signature' in record's headers.
Пример ошибки при невалидном signature:
[ERROR] [kafkaListener0-0-C-1] [r.s.s.k.i.KafkaSignatureConsumerInterceptor] [T:] - Skipping record: Error while processing record(topic: consumer_test, partition: 0, offset: 771): Failed to validate record with SignatureContainer(signature=check, attributes={}).
failOnConsume
Ничего в логах не пишется и сообщение не отправляется
**Описание ssl конфигурации (/deployments/config/ssl/ssl.yml) - должна быть определена как Secret или **
spring:
kafka:
ssl:
key-password: <Пароль от ключа клиента>
key-store-password: <Пароль к keystore>
trust-store-password: <Пароль к truststore>
producer:
properties:
interceptor.signature.certifficate.ssl.keystore.password = <Пароль к keystore>
interceptor.signature.certificate.ssl.key.password = <Пароль к key>
Миграция на текущую версию#
Для миграции на текущую версию требуется:
Уточнить в разделе Примечания к релизу необходимость внесения изменений в конфигурацию шлюза при переходе на текущую версию.
При необходимости подготовить Config map с новой конфигурацией.
Подготовить артефакт Deployment, в котором заменить ссылку на Docker-образ шлюза в репозитории ссылкой на Docker-образ с текущей версией.
Если это указано в разделе Примечания к релизу, изменить значения выделяемых ресурсов.
Остановить шлюз.
Загрузить новую конфигурацию шлюза.
Загрузить новый Deployment.
Запустить шлюз (если в Deployment указано количество реплик > 0, шлюз запустится автоматически).
Дополнительно:
Порядок действий при обновлении - см. в разделе Руководство по установке.
Остановка/запуск шлюза - см. в разделе Руководство по системному администрированию
Быстрый старт#
Создать ConfigMap application.yml, предварительно указав:
подключения к брокерам Kafka:
spring.kafka.producer.bootstrap-servers: <hostname kafka broker>:<port>;spring.kafka.producer.topic-default: <topic name>;spring.kafka.consumer.bootstrap-servers: <hostname kafka broker>:<port>;spring.kafka.consumer.topics: <topic names>;
порт приложения:
server.port: <port>;
сервер и клиент gRPC:
grpc.server.server-port: <port>;grpc.client.settings.default.port: <port>;grpc.client.settings.default.hostname: <hostname>;
Пример минимальной конфигурации ConfigMap с application.yml
apiVersion: v1 data: application.yml: |- grpc: server: server-port: 5454 client: settings: default: hostname: localhost port: 5556 server: port: 8787 logging: level: org: springframework: "INFO" root: "INFO" spring: main: banner-mode: "off" profiles: production kafka: producer: bootstrap-servers: <HOST>:<PORT> topic-default: SrvDistributeMergePrivateClient.Request consumer: bootstrap-servers: <HOST>:<PORT> topics: SrvDistributeMergePrivateClient.Response group-id: kafkaAdapter kind: ConfigMap metadata: name: grpc-kafka-adapter-configСоздать Service для адаптера, предварительно указав:
spec.ports[].name: grpc;spec.ports[].port: 5454;spec.ports[].targetPort: 5454.
Service
apiVersion: v1 kind: Service metadata: {name: grpc-kafka-adapter} spec: selector: {name: grpc-kafka-adapter} ports: - {name: grpc, port: 5454, targetPort: 5454} type: ClusterIPСоздать ConfigMap по настройке fluentbit агента, предварительно указав:
точки подключения к брокерам Kafka:
Brokers <hostname>:<port>;Topics <topic>.
путь до файлов, которые необходимо читать:
Path <path to log files>.
fluent-bit.conf
apiVersion: v1 data: fluent-bit.conf: |- [SERVICE] Flush 1 Daemon Off Log_Level Debug Parsers_File /fluent-bit/etc/parsers.conf [INPUT] Name tail Path <path to log files> Parser custom Tag custom-tag Buffer_Chunk_Size 400k Buffer_Max_Size 6MB Mem_Buf_Limit 6MB [OUTPUT] Name kafka Brokers <hostname>:<port> Timestamp_Key fluent_time Topics <topic> rdkafka.security.protocol ssl rdkafka.ssl.ca.location /fluent-bit/cert/ca_cert.pem rdkafka.log.connection.close false rdkafka.request.required.acks 1 parsers.conf: |- [PARSER] Name custom Format json kind: ConfigMap metadata: annotations: name: fluent-bitСоздать Secret сертификата fluentbit, предварительно указав в блоке
ca_cert.pemзначение сертификата. fluent-bit-certapiVersion: v1 data: ca_cert.pem: >- LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tDQpNSUlGRFRDQ0F2V2dBd0lCQWdJUVlyUStoR2JRVHFsQktjTHVMQWo1dURBTkJna3Foa2lHOXcwQkFRVUZBREFaDQpNUmN3RlFZRFZRUURFdzVVWlhOMElGSnZiM1FnUTBFZ01qQWVGdzB4TmpBM01UVXhNVFV6TURWYUZ3MHpNakEzDQpNVFV4TWpBeU5UaGFNQmt4RnpBVkJnTlZCQU1URGxSbGMzUWdVbTl2ZENCRFFTQXlNSUlDSWpBTkJna3Foa2lHDQo5dzBCQVFFRkFBT0NBZzhBTUlJQ0NnS0NBZ0VBdS9mL1ppR04vK1RaSmhXVUsyN0N6VllGbjR6cDV4TnNVRG84DQphMktXcFVYbnNnQitDYXkweDNFK1RXRTA0R0ZlTjh5YW9wL3VIdGRITzEySmxjaWsvUmpwSnhuaDZNT3k4RnNYDQpZMEZCL3hTWEdqNjNnTkdCQkhGOTc2U2VHRWg5QmVncVVkT20yV0NNWEROcEx0OEdmZkU1ZUJuNGRXbWNwZTIwDQp6NE9SQmZoOXNiWnhya3VtVkwzKzhQaW1FQUZxam5mTmp0VXV3WWZTTThWd05QY2krZytiYXpwM2xicmxIMmRKDQpsdldLY3ZEalZKUENabEplQXJkYk05cHBvR0xzTmM0Nm8xNHZvR1RNcEY0OFJWK0Vqbm9Qdk42WHlURWZhbXV2DQpER2JoMzNoei9WY2JtdStlcFgwVXZvKy9RcG9ub0Y0Y3oyTXlTMlRvZHdRc2Nmbno0L2JjTG04TEtoQ0puMi9tDQp3VG11ZzVVbDF1TnhKdnBFRkwvSmdUa2ZZZTVtU0FJRW53U0JranBDc0krdlIzd3VhQi9zNitNK2JPSTlaV0xPDQp4clNWVkhBWXZMV1BZRkRGMGVabHNoZnFPTEZoTCtaZTJrYVhuL1gxNnFnYjdwQU4wNlhGdUR0TnpTb1lrZG4wDQoyOHdFcEJKNUVkdmRTczdOUzVPOGFaVkRId3BwamQvVVRqQlFCZGE1OENkNmlxM0VNN1oySW5KVjg3VHpXVXRJDQo5cm5Zdld0MWc0ZkhaZ1I2VGlhVmtsVzhtdDg5QzM4Uk9ELzNrNlVERy96b1F0RnhTNXA0em1HY2orREdrYm9WDQo4UktGRldxUnpYb0k3T3ZzQVhlWVVXSXMzVFVEU0QvWkFqeWhMTTJHak44TWV4dFpvaEtia1U0MW1OQ2VIR3JYDQpRSGdEeUxrQ0F3RUFBYU5STUU4d0N3WURWUjBQQkFRREFnR0dNQThHQTFVZEV3RUIvd1FGTUFNQkFmOHdIUVlEDQpWUjBPQkJZRUZKK3lUYWZyVDhzS1NRUkIweWNvckJ6THd4ZGJNQkFHQ1NzR0FRUUJnamNWQVFRREFnRUFNQTBHDQpDU3FHU0liM0RRRUJCUVVBQTRJQ0FRQkxJQTVENDJLY1Z1ZUNhWGdvRGFZK082bUdxRlFOeE1BYkVpZ1QyTk44DQpueFNiVFpjN0tuU3pCaVg2aGxUZW9BTFJWSVcrZXVKdklpZTZ6OE4ra00xNk0vNmdCZUluaTVFRHFadzZqOEhMDQpzdFkyV2R3S29VdThkcDgyWUF3Mkl4eURzamo3cUZGV3RjNVNtLzQ5VGlld2hFeDkyR1ZWcDZ5eGtlQjdHK1ZQDQpWTFNnSlZTUDc1YzNUdHR4V3NHVWZWSUt1WXVCNzQ3TldYb0Y3bkZEeXQ4TjJ5bTJqdEVmaElHOGwvRHJWMmxmDQpQb3hZZHhGWEVVcldqYlB0WHlyUTdSOEc5bm1zMWdCbU0vT1RocUFEbzRsRk44U2Q4MC9ETjV6RDIzOXBMcFExDQp3UXVPVzhGV3ozejV6bTJrNmRHLzRiTG9xSkh3QjFBU29MUW1sNUVxeENjaXd0S0lxRE1jb3ltN0ptTGhycmFIDQpIcmI3cGY5YnpOd2c3TXJZZ0VEbjFIYkhzRWR4NkJscndrVWdHWDFlQmljaVJEWENpUHJCSm16YzlrSG5EOE9ZDQpoU0F1Q21aaVcwNFhlTDM5ak8raWZtSlg4ZlVNRkJHZE1VRUR5YmFMQlhZdGlZMnZ4YzV5elBRUlVkdURhVVJKDQpSVENnN1owbERXS2ZLWUc2RDU0dGNVc2hmeENBUjBMYWs4dndBK3RoV05DN0RmU3pRLzgrajdiY0p4UW9QeWpWDQpUY3d4MDVCVnkrL3JmTW5pT3FST0M2VnozRlpWQ1lZZnAzdENhYkI5alNReXJzck41RHN6YzN0SmxjODFtakNLDQpqZUZkQlRvOW5FS2ZKcjIxd0krRXJveFhlbE9rTFF0bnRwcEpKSEE4TFh4NG5MUVlWdWFINkFiTHZQaE85R0xrDQozZz09DQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0t kind: Secret metadata: name: fluent-bit-cert type: OpaqueСоздать Deployment,
В конфигурационном файле должны быть заданы параметры:spec.template.spec.volumes[0].configMap.name: <Имя ConfigMap с application.yml для основного контейнера Шлюза>;spec.template.spec.volumes[1].configMap.name: <Имя ConfigMap с fluent-bit.conf для контейнера журналирования>;spec.template.spec.containers[0].image: <ссылка на Docker-образ Шлюза в целевом репозитории>;spec.template.spec.containers[1].image: <ссылка на Docker-образ агента журналирования в целевом репозитории>.
Deployment
apiVersion: apps/v1 kind: Deployment metadata: {name: grpc-kafka-adapter} spec: replicas: 1 progressDeadlineSeconds: 120 strategy: type: RollingUpdate rollingUpdate: maxUnavailable: 0 selector: matchLabels: {name: grpc-kafka-adapter} template: metadata: annotations: {sidecar.istio.io/inject: 'true'} labels: {app: grpc-kafka-adapter, name: grpc-kafka-adapter, version: v1} spec: terminationGracePeriodSeconds: 60 containers: - env: - name: PROJECT_NAME valueFrom: fieldRef: {apiVersion: v1, fieldPath: metadata.namespace} securityContext: runAsNonRoot: true readOnlyRootFilesystem: true name: grpc-kafka-adapter image: <image_version> imagePullPolicy: Always ports: - containerPort: 5454 protocol: TCP resources: limits: {cpu: 400m, memory: 512Mi} requests: {cpu: 400m, memory: 512Mi} volumeMounts: - {mountPath: /opt/synapse/logs, name: synapselogs} - {mountPath: /fluent-bit/etc/, name: fluent-bit, readOnly: true} - {mountPath: /ssl, name: grpc-kafka-adapter-keystore, readOnly: true} - {mountPath: /deployments/config/, name: grpc-kafka-adapter-config, readOnly: true} - {mountPath: /deployments/config/ssl/, name: grpc-kafka-adapter-ssl-secret, readOnly: true} livenessProbe: httpGet: {path: /actuator/health/liveness, port: 8787} initialDelaySeconds: 60 timeoutSeconds: 5 failureThreshold: 10 readinessProbe: httpGet: {path: /actuator/health/readiness, port: 8787} initialDelaySeconds: 60 timeoutSeconds: 5 failureThreshold: 3 - image: <logging_agent_image_version> imagePullPolicy: Always securityContext: runAsNonRoot: true readOnlyRootFilesystem: true name: fluent-bit resources: limits: {cpu: 100m, memory: 300Mi} requests: {cpu: 100m, memory: 300Mi} volumeMounts: - {mountPath: /opt/synapse/logs, name: synapselogs} - {mountPath: /fluent-bit/etc/, name: fluent-bit, readOnly: true} - {mountPath: /fluent-bit/cert/, name: fluent-bit-cert, readOnly: true} volumes: - configMap: defaultMode: 256 name: grpc-kafka-adapter-config name: grpc-kafka-adapter-config - configMap: defaultMode: 256 name: fluent-bit name: fluent-bit - name: grpc-kafka-adapter-keystore secret: defaultMode: 256 optional: true secretName: grpc-kafka-adapter-keystore - name: grpc-kafka-adapter-ssl-secret secret: defaultMode: 256 optional: true secretName: grpc-kafka-adapter-ssl-secret - emptyDir: {} name: synapselogs - name: fluent-bit-cert secret: defaultMode: 256 optional: true secretName: fluent-bit-certСоздать ServiceEntry для разрешения доступа к внешнему ресурсу - брокерам Kafka (egress не требуется), предварительно указав:
spec.hosts[]: not.used;spec.ports[].protocol: TCP;spec.ports[].name: tcp;spec.ports[].number: <port>;spec.addresses: <IP>/<MASK>.ServiceEntry
apiVersion: networking.istio.io/v1alpha3 kind: ServiceEntry metadata: name: kafka-broker-dev spec: addresses: - <IP>/<MASK> #диапазон ip адресов брокеров Kafka exportTo: - . hosts: - not.used location: MESH_EXTERNAL ports: - name: tcp number: <port> #порт брокеров Kafka protocol: TCP
Настройка SSL:
Создать JKS keystore, который будет содержать клиентский ключ и клиентский сертификат, а также Root-сертификаты от ЦА:
# создайте из ключа и сертификата файл в формате p12, а затем конвертируйте в JKS openssl pkcs12 -export -in client.pem -inkey client.key -out keystore.p12 -name 00CA0001CSYNAPSEsnp99usr keytool -importkeystore -deststorepass <ПАРОЛЬ> -destkeypass <ПАРОЛЬ> -destkeystore keystore.jks -srckeystore keystore.p12 -srcstoretype PKCS12 -deststoretype JKS -srcstorepass <ПАРОЛЬ> -alias 00CA0001CSYNAPSEsnp99usr # добавьте ЦА сертификаты в хранилище keytool -keystore keystore.jks -alias "test root ca 2" -import -file ca-root.crt keytool -keystore keystore.jks -alias "sberbank test issuing ca 2" -import -file ca-chain.crtСоздать секрет, содержащий JKS keystore, предварительно указав в блоке
keystore.jksзначение keystore (значение после «keystore.jks: >-» хранится в base64 кодировке):
keystoreapiVersion: v1 data: keystore.jks: >- /u3+7QAAAAIAAAADAAAAAQAYMDBjYTAwMDFjc3luYXBzZXNucDk5dXNyAAABasTUhmwAAAUBMIIE/TAOBgorBgEEASoCEQEBBQAEggTp5Kl/i64SYwC9uqp9VlT7ma01hwRR4+uaGfPeCR8aUVrSeQCbSHeb4Lxbt4MftZyRS8GKckGodCCCKPe2NCyu++EeqqKhXocrmMPlyxidyCupaGyN2PwtHk2NJ5mfS0ouL5i4k4ua7A2hhFspnQBRuUkczBONz168Jk7+236IkZCGPCv4okptOqxSWMIraWBJ0YK0D2cBGcNf+VYm1B5y6gQ16Bo/7a1RYFVI4B9H7MXaLWvpgUhVE/X6hZVPFumQ7VmygfTui1CPkkwCmt4lQL/oFiQTHxooN3DO7u+v6hSoO7WKF4pvnv25ZknmJnl+VhFGp3q75m3Fns9FgCqgPvKB4wN2d7+RwZ5eGHQOCm7pOoRhoK9iXmg8ZyMAs/d60gGFvmVoi/aCnm0bDDjDl2GOJ/RIPSTNLmj6DE/ZY6cg8Kqc0sNxsf7Yn8pXL6lp99pqY91jzu3LuCGscGSK7L8THSD9uOJZcMQ0eo8zNFKUuJL9oPETZZuBI6wI8toJFwv0MEcG5MJB60C0UAmFxPv5uL1olndJUF8Bsw2O7xnr/UcA7GotkvWRiLVqjBjGRv/+tl19RSTcwGFnkgEuDDxwaSBJHtDB4WBh5Ml2vBPlzLhCwW+dEZXl6zFgc8XgFnjiOqatre4PXvW3da/BTZzAWEeQPgzXRO1KDciUazvxx0uXio4selqNBESgbz/iAM6NplhSM1XU2YuZAnPEYR/fcefhMRGJ9XXi6wekE6bafZS0rCEi3z/mnPA5t1z0Lnyb24HPpMv4ZMh4TrK8eVyeV5SASJSUnYX2zoMAM01BG6IrIn5I2yQvbb0a5ZchS46OtV8v+Pg6kTJn0OubKRehA+QADaG5Q5uQ5QWt2nU+45hZ+3L48iIk1Uzdf26O8a26+FJJ2J3dic7xwfxEGeRxt1xuLHgKbKGdCucILUuLHaiIH660+ctN4vs3HBa0IfkPQl4/yDNJ1w/5liJ5ciHihDQp3FYVN8991HHeLqU3Qp5mR/1l3yY1qlW+g4Rl5/P6a5eLWS1+XTFPX2UFVobiS8lTnvj6dtqdpzOHNkCnkp4GQ2XDMSIYbyxizeBoqz32P8f9TP0bgpxFWEXz8oUN10T7ecm4Vfd7Pd7nvC6gcGa6MQyTKinhw6s9OVVOD5b7QGKKvGvy8dMONNoE0KXfk+MouTu4bnPFDhrI1InENfV9psJ4OxxxaNHyNUJ/jh4TnqEe47f982uInUQyeCKCIChvJWxPDZAkQVM5Dbf+Kv0+YvewcFkv3zbvF5W0+6VCnAsOxxbYqJyK48r4J7z6qdho7LpEidXjM6KlM91CtFmxd/sKlJQ0sGjRUY1gjLHsS1vU+giWGVCmslDEPDVw3emR9PSU7c836IBbjDP6DY+mDIzhmtlb1BjeJ7y/idNrM1JT++OLWIPKb5/mwuDUG60+u0e3PIsoC/a+ZMiDrA4+Rw2/9IDROHNoJRNCjc+ItSRtOj40T6lLIS7MjdNPJghJfehJ66IHsxSCgMfi9htZy/dcTjhD4b3sFp+kII3OZKU3BdXXVznn9oViCiXkqjW2OcNeczdLYQT4hhlhAFqsfHktEfmpw0bDHn0ufwtHmkdmV52gtvhMJpzje3vBa1b2IWr9AlcKZMes7h+XIYwNg4JORhZnU9t6dep54ubzuoXzV8lIAAAAAQAFWC41MDkAAAbgMIIG3DCCBMSgAwIBAgIKaZpdGAABAAA06TANBgkqhkiG9w0BAQsFADBjMRIwEAYKCZImiZPyLGQBGRYCcnUxFDASBgoJkiaJk/IsZAEZFgRzYnJmMRIwEAYKCZImiZPyLGQBGRYCY2ExIzAhBgNVBAMTGlNiZXJiYW5rIFRlc3QgSXNzdWluZyBDQSAyMB4XDTE5MDUwODA2MTcxMloXDTIzMDUwODA2MjcxMlowcDELMAkGA1UEBhMCUlUxLzAtBgNVBAoTJlNhdmluZ3MgQmFuayBvZiB0aGUgUnVzc2lhbiBGZWRlcmF0aW9uMQ0wCwYDVQQLEwQwMENBMSEwHwYDVQQDExgwMENBMDAwMUNTWU5BUFNFc25wOTl1c3IwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDIUmw4gt8tB8q3Q4O2snvZGBZF/wQ/ZIH22yTbZoARkwRYjuc+j5jmB5yDfjBBmzdbCWzKIUBDuq6lNyR3NWUarxnz2zBGRhX9HwOwuECVCtvb4e7FlIsh/msLOZhldyF+iLP1r0lU1Ox3Y4IC2KVYEynlsLrG+pdZMOJLfVLczFlEZXGhqWpysOlsrbdUCEmctvAtzz3ZPyod7+2UTen9mX3LGt6obaCohlKcW3kQ18dIlj0fIUyJMCV9ECTCwFYCwt3lOddmSBtI92zmLMM312v5p9iyzHnK3J/UPWjtQR3TTGWnUs1fRhP+6DvYpjzNQQdewV3qOjpsM3loyJnzAgMBAAGjggKDMIICfzAdBgNVHQ4EFgQUJgrGPrAo8fioVecAVyKrGTrgjO0wHwYDVR0jBBgwFoAUUV/qviYeTSBYvItLGq/2BYyhdTIwggFsBgNVHR8EggFjMIIBXzCCAVugggFXoIIBU4ZFaHR0cDovL3BraS5zYmVyYmFuay5ydS9wa2kvY2RwL1NiZXJiYW5rJTIwVGVzdCUyMElzc3VpbmclMjBDQSUyMDIuY3JshoHAbGRhcDovLy9DTj1TYmVyYmFuayUyMFRlc3QlMjBJc3N1aW5nJTIwQ0ElMjAyLENOPVRWLUNFUlQtVUIsQ049Q0RQLENOPVB1YmxpYyUyMEtleSUyMFNlcnZpY2VzLENOPVNlcnZpY2VzLERDPVVuYXZhaWxhYmxlQ29uZmlnRE4/Y2VydGlmaWNhdGVSZXZvY2F0aW9uTGlzdD9iYXNlP29iamVjdENsYXNzPWNSTERpc3RyaWJ1dGlvblBvaW50hkdodHRwOi8vaW50cGtpLmNhLnNicmYucnUvcGtpL2NkcC9TYmVyYmFuayUyMFRlc3QlMjBJc3N1aW5nJTIwQ0ElMjAyLmNybDCBvgYIKwYBBQUHAQEEgbEwga4wVAYIKwYBBQUHMAKGSGh0dHA6Ly9wa2kuc2JlcmJhbmsucnUvcGtpL2FpYS9TYmVyYmFuayUyMFRlc3QlMjBJc3N1aW5nJTIwQ0ElMjAyKDEpLmNydDBWBggrBgEFBQcwAoZKaHR0cDovL2ludHBraS5jYS5zYnJmLnJ1L3BraS9haWEvU2JlcmJhbmslMjBUZXN0JTIwSXNzdWluZyUyMENBJTIwMigxKS5jcnQwDAYDVR0TAQH/BAIwADANBgkqhkiG9w0BAQsFAAOCAgEAQFHEWrITBZLrwmaKelhkZHua6k8JuNrgddr2jBko9zq7S2z3sAz8n7Nt4+RA71crxk/V1wBF+NYqUzx48umKL3lWmWvIpVVpC4DQL7z+DyIG+kCdw/NlaPN+dnk2M9+2TOBBJhca2KfH7HIc4bQNlmpEs6ID+oriwnHJ1YoEWKSN9uuc0KAlnT8VPB0tLyIHd8H4QwG1xRy03YEFvp5r3jXvO8R9Y3itQDtRFLk4bOJehYyLCe5FgXJB1KVKj7ODwhT+zKYRhWyTrfbEfAxn1bofnGsAsTg29b8xv0Ow4i7PKtDd7giR1h95uorxKWfZC0blsYesS8Br5FahjLRW2Onxwedi8opqyVtzvh3dzpJiYwQ8Kj3fz6jRb+Bbt/DI2Cd0KBGN+az97wk+aQktXFCjsCNvmptC4f0JjuRA+P55g6asXfALVuoP42jFPoC+KunJ26kLVyl4rpP/V1kt7gsawzcu7XS+2kBwouukz24f6M+mtnJy+ElrTsbn0RuGo5AF/RzLFZCotOh4OX4ESztshDPtjpR9ptETz+6KlH3+ZgzqAti/5ERrgpAYiDSvJi1gEQD4kfS2z0X89GMcnSI3wJBLS7hYJwbIioVSOfK3Oig/AWzKBd2XWN3Oo7hjl0c8NyxYNdZpOWVTUIYb3CQne61SMItzCW3d1hCFJTcAAAACAA50ZXN0IHJvb3QgY2EgMgAAAWqWYEDhAAVYLjUwOQAABREwggUNMIIC9aADAgECAhBitD6EZtBOqUEpwu4sCPm4MA0GCSqGSIb3DQEBBQUAMBkxFzAVBgNVBAMTDlRlc3QgUm9vdCBDQSAyMB4XDTE2MDcxNTExNTMwNVoXDTMyMDcxNTEyMDI1OFowGTEXMBUGA1UEAxMOVGVzdCBSb290IENBIDIwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQC79/9mIY3/5NkmFZQrbsLNVgWfjOnnE2xQOjxrYpalReeyAH4JrLTHcT5NYTTgYV43zJqin+4e10c7XYmVyKT9GOknGeHow7LwWxdjQUH/FJcaPreA0YEEcX3vpJ4YSH0F6CpR06bZYIxcM2ku3wZ98Tl4Gfh1aZyl7bTPg5EF+H2xtnGuS6ZUvf7w+KYQAWqOd82O1S7Bh9IzxXA09yL6D5trOneVuuUfZ0mW9Ypy8ONUk8JmUl4Ct1sz2mmgYuw1zjqjXi+gZMykXjxFX4SOeg+83pfJMR9qa68MZuHfeHP9Vxua756lfRS+j79CmiegXhzPYzJLZOh3BCxx+fPj9twubwsqEImfb+bBOa6DlSXW43Em+kQUv8mBOR9h7mZIAgSfBIGSOkKwj69HfC5oH+zr4z5s4j1lYs7GtJVUcBi8tY9gUMXR5mWyF+o4sWEv5l7aRpef9fXqqBvukA3TpcW4O03NKhiR2fTbzASkEnkR291Kzs1Lk7xplUMfCmmN39ROMFAF1rnwJ3qKrcQztnYiclXztPNZS0j2udi9a3WDh8dmBHpOJpWSVbya3z0LfxE4P/eTpQMb/OhC0XFLmnjOYZyP4MaRuhXxEoUVapHNegjs6+wBd5hRYizdNQNIP9kCPKEszYaM3wx7G1miEpuRTjWY0J4catdAeAPIuQIDAQABo1EwTzALBgNVHQ8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4EFgQUn7JNp+tPywpJBEHTJyisHMvDF1swEAYJKwYBBAGCNxUBBAMCAQAwDQYJKoZIhvcNAQEFBQADggIBAEsgDkPjYpxW54JpeCgNpj47qYaoVA3EwBsSKBPY03yfFJtNlzsqdLMGJfqGVN6gAtFUhb564m8iJ7rPw36QzXoz/qAF4ieLkQOpnDqPwcuy1jZZ3AqhS7x2nzZgDDYjHIOyOPuoUVa1zlKb/j1OJ7CETH3YZVWnrLGR4Hsb5U9UtKAlVI/vlzdO23FawZR9Ugq5i4Hvjs1ZegXucUPK3w3bKbaO0R+EgbyX8OtXaV8+jFh3EVcRStaNs+1fKtDtHwb2eazWAGYz85OGoAOjiUU3xJ3zT8M3nMPbf2kulDXBC45bwVbPfPnObaTp0b/hsuiokfAHUBKgtCaXkSrEJyLC0oioMxyjKbsmYuGutocetvul/1vM3CDsytiAQOfUdsewR3HoGWvCRSAZfV4GJyJENcKI+sEmbNz2QecPw5iFIC4KZmJbThd4vf2M76J+Ylfx9QwUEZ0xQQPJtosFdi2Jja/FznLM9BFR24NpRElFMKDtnSUNYp8pgboPni1xSyF/EIBHQtqTy/AD62FY0LsN9LND/z6PttwnFCg/KNVNzDHTkFXL7+t8yeI6pE4LpXPcVlUJhh+ne0JpsH2NJDKuys3kOzNze0mVzzWaMIqN4V0FOj2cQp8mvbXAj4SujFd6U6QtC2e2mkkkcDwtfHictBhW5ofoBsu8+E70YuTeAAAAAgAac2JlcmJhbmsgdGVzdCBpc3N1aW5nIGNhIDIAAAFqlmAbHAAFWC41MDkAAAqcMIIKmDCCCICgAwIBAgITGAAAAAS5RQdwBbAYQAAAAAAABDANBgkqhkiG9w0BAQsFADAZMRcwFQYDVQQDEw5UZXN0IFJvb3QgQ0EgMjAeFw0xNjA3MTUxMjQ3NDNaFw0yNDA3MTUxMjU3NDNaMGMxEjAQBgoJkiaJk/IsZAEZFgJydTEUMBIGCgmSJomT8ixkARkWBHNicmYxEjAQBgoJkiaJk/IsZAEZFgJjYTEjMCEGA1UEAxMaU2JlcmJhbmsgVGVzdCBJc3N1aW5nIENBIDIwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDOe9PJjSMr4QGveswkep1/73kCHOmAlPiWoE8BjppW+uDibtc7yWXKipE4QgbqXNz1u3oy6WyLQ3PBA2PPclg8O425xCH8VgbOWz0FmM7vdFoMYJGLFpVq9DE1VEHI3MP3lOt5xhDU3IVaCHnMDddx8mFS2RkXNvDBLa5NBfge2UM5SslEQROJc3fuFQmLD4HRlaJe+sgOqm3buw3a5WVGbaDGQ8L/e1kmRyQa78IrHcbWFwge9aWbHaUY4e/uB1z+6EzWuaODRsjlrpXH/dvg4XLsVJGloxQcqgFhvbml3fcqlPhSWydiUHnimzIOILdPD9HBIfyOuSDpIpk5WeLXNEJe/i0EotclEIyWwWOjUoFh3ZiK6seEVbMNXWftNiKr6W3bNW+kfseSOUddASHxBz5WsZQAnigLuwwjTr+k1x7Vi7F8xTzp9JDuB2i+0eK9CSoFLjq+fHqziUmNEX2xTbIJjeJp511Wuj5iv+0imirUJJJCbRJefweO84lQpnhVvsk4wYY2AuYOm71/G/BfQ3yypwprhuLNcG5yXZ/ZTW0y/6jKmNNLlvnV56CKElOr13z427K6VK7eIwHZgeQpcUkja3zW+cblrFrwge+BduspNqO6dn/2HaP5cfoc3Un+F1nY5Whu8RxrblEecF/5ZLzc1g2JC2pzDiwTg4fQPwIDAQABo4IFjTCCBYkwEAYJKwYBBAGCNxUBBAMCAQEwIwYJKwYBBAGCNxUCBBYEFBKfeeA/4akm+B0LLAPMsvGeTDtLMB0GA1UdDgQWBBRRX+q+Jh5NIFi8i0sar/YFjKF1MjAZBgkrBgEEAYI3FAIEDB4KAFMAdQBiAEMAQTALBgNVHQ8EBAMCAYYwDwYDVR0TAQH/BAUwAwEB/zAfBgNVHSMEGDAWgBSfsk2n60/LCkkEQdMnKKwcy8MXWzCCAloGA1UdHwSCAlEwggJNMIICSaCCAkWgggJBhjdodHRwOi8vcGtpLnNiZXJiYW5rLnJ1L3BraS9jZHAvVGVzdCUyMFJvb3QlMjBDQSUyMDIuY3JshoHGbGRhcDovLy9DTj1UZXN0JTIwUm9vdCUyMENBJTIwMixDTj1UZXN0Um9vdENBMjU2LENOPUNEUCxDTj1QdWJsaWMlMjBLZXklMjBTZXJ2aWNlcyxDTj1TZXJ2aWNlcyxDTj1Db25maWd1cmF0aW9uLERDPXNpZ21hLERDPXNicmYsREM9cnU/Y2VydGlmaWNhdGVSZXZvY2F0aW9uTGlzdD9iYXNlP29iamVjdENsYXNzPWNSTERpc3RyaWJ1dGlvblBvaW50hoG9bGRhcDovLy9DTj1UZXN0JTIwUm9vdCUyMENBJTIwMixDTj1UZXN0Um9vdENBMjU2LENOPUNEUCxDTj1QdWJsaWMlMjBLZXklMjBTZXJ2aWNlcyxDTj1TZXJ2aWNlcyxDTj1Db25maWd1cmF0aW9uLERDPXNicmYsREM9cnU/Y2VydGlmaWNhdGVSZXZvY2F0aW9uTGlzdD9iYXNlP29iamVjdENsYXNzPWNSTERpc3RyaWJ1dGlvblBvaW50hj9odHRwOi8vZXh0cGtpLnNpZ21hLnNicmYucnUvQ2VydEVucm9sbC9UZXN0JTIwUm9vdCUyMENBJTIwMi5jcmyGPGh0dHA6Ly9pbnRwa2kuY2Euc2JyZi5ydS9DZXJ0RW5yb2xsL1Rlc3QlMjBSb290JTIwQ0ElMjAyLmNybDCCAncGCCsGAQUFBwEBBIICaTCCAmUwQwYIKwYBBQUHMAKGN2h0dHA6Ly9wa2kuc2JlcmJhbmsucnUvcGtpL2FpYS9UZXN0JTIwUm9vdCUyMENBJTIwMi5jcnQwgbgGCCsGAQUFBzAChoGrbGRhcDovLy9DTj1UZXN0JTIwUm9vdCUyMENBJTIwMixDTj1BSUEsQ049UHVibGljJTIwS2V5JTIwU2VydmljZXMsQ049U2VydmljZXMsQ049Q29uZmlndXJhdGlvbixEQz1zaWdtYSxEQz1zYnJmLERDPXJ1P2NBQ2VydGlmaWNhdGU/YmFzZT9vYmplY3RDbGFzcz1jZXJ0aWZpY2F0aW9uQXV0aG9yaXR5MIGvBggrBgEFBQcwAoaBomxkYXA6Ly8vQ049VGVzdCUyMFJvb3QlMjBDQSUyMDIsQ049QUlBLENOPVB1YmxpYyUyMEtleSUyMFNlcnZpY2VzLENOPVNlcnZpY2VzLENOPUNvbmZpZ3VyYXRpb24sREM9c2JyZixEQz1ydT9jQUNlcnRpZmljYXRlP2Jhc2U/b2JqZWN0Q2xhc3M9Y2VydGlmaWNhdGlvbkF1dGhvcml0eTBZBggrBgEFBQcwAoZNaHR0cDovL2V4dHBraS5zaWdtYS5zYnJmLnJ1L0NlcnRFbnJvbGwvVGVzdFJvb3RDQTI1Nl9UZXN0JTIwUm9vdCUyMENBJTIwMi5jcnQwVgYIKwYBBQUHMAKGSmh0dHA6Ly9pbnRwa2kuY2Euc2JyZi5ydS9DZXJ0RW5yb2xsL1Rlc3RSb290Q0EyNTZfVGVzdCUyMFJvb3QlMjBDQSUyMDIuY3J0MA0GCSqGSIb3DQEBCwUAA4ICAQADGKevectc4iZbVD5sHbmX1Cz6feeJql/ZPqBJyjVLLoJFELVJnwwshU8WOdakJjeOx7g2k2p+tbTw+hNpmgYh+kqqR6pTEntuigHXC3ncqYG7UlDOFAKA+6EGPpTxEAoy1cFDJo0dNFK5rOzvXD8gbCEC6jKB77bqNZN8R4jGkvOhT47tSC0PoNMwizeyKDVsVdaY7L4Dqg4XspV++ojr8k+13veME3I+REoubZlMwJUnlyBkXDwfQ636TFzbBKqwSgXqlBH0rpKIUfJ2m/Ow1S61VVQYsubbf7BRZH/eO0NwB/LqEKhHoFPrvaGj/4gxHAGJKl8ZiO93LgcPTjkWLxg/Cis+euipt+TZ/0WwXcZhnfjAzjq/vQii70054JZILb62VRZR/cmoISC6wFQ3GxkzIBnKtGPyQgCdkL+KSFYimCqV2jpecpikUvg0s4JokfVTGwFq0mtUpz3YtIjwrRp2Js+bmrCMtpoYLwEJ/5qThnxLQ/3O+0z3CklX9vrnazecwl0n1VCaec27fjFSpEglABIAmImCIc4FHuJp9ej3gwpTaCeaKdE4uGElLWzh8L3uDBi6/74gMvcBK/fPTAVc+fW/3ZXc2jNU/1Y3hN24q8DZnbtLBnnuKH1xJ7PgM2X8dRe6y6MO7VA8ERavJ9eXSr8WFuXXxVJB9bF2BfTxxF7tSmiPKEE2VYwZaWesu03a kind: Secret metadata: name: grpc-kafka-adapter-scm-keystore type: OpaqueСоздать секрет, в котором будут указаны пароли к jks, ssl.yml должен быть определен в следующем виде:
spring: kafka: ssl: key-password: <Пароль от ключа клиента> key-store-password: <Пароль к keystore> trust-store-password: <Пароль к truststore>Secret ssl.yml
apiVersion: v1 data: ssl.yml: >- c3ByaW5nOg0KICBrYWZrYToNCiAgICBzc2w6DQogICAgICBrZXktcGFzc3dvcmQ6ICJxd2UxMjMiDQogICAgICBrZXktc3RvcmUtcGFzc3dvcmQ6ICJxd2UxMjMiDQogICAgICB0cnVzdC1zdG9yZS1wYXNzd29yZDogInF3ZTEyMyINCg== kind: Secret metadata: name: grpc-kafka-adapter-ssl-secret type: OpaqueУбедиться, что секреты монтируются в Pod, зайдя в терминал пода и посмотрев файловую систему
Указать в конфиге адаптера application.yml настройки ssl, которые должны содержать
пути до хранилищ доверенных и личных сертификатов:
spring.kafka.ssl.key-store-location: <path>;spring.kafka.ssl.trust-store-location: <path>.
протокола подключения:
spring.kafka.ssl.protocol: TLSv1.2;spring.kafka.ssl.properties.security.protocol: SSL;spring.kafka.ssl.properties.ssl.enabled.protocols: TLSv1.2.
настройки ssl
spring: kafka: ssl: key-store-location: file:/ssl/keystore.jks trust-store-location: file:/ssl/keystore.jks protocol: TLSv1.2 properties: security.protocol: SSL ssl.enabled.protocols: TLSv1.2
Использование программного компонента#
Шлюз Kafka предназначен для организации взаимодействия внутренних компонентов Synapse (правообладателем которых является АО «СберТех»), таких как другие шлюзы или сервисы трансформации, с внешними АС с помощью topic(s), расположенных на брокерах Kafka. Для внутренних вызовов между компонентами Synapse используется протокол gRPC. Для сериализации передаваемых сообщений внутри Synapse используется ProtoBuf.
Использование компонента Шлюз Kafka позволяет исключить специфичную логику вызова topic(s) Kafka из внутренних компонентов Synapse.
Шлюз Kafka реализован в виде контейнеризированного Java приложения, запускаемого в среде оркестрации контейнеров.
Шлюз Kafka может использовать технологические сервисы платформы, обеспечивающие прикладное журналирование, интеграционное журналирование, а также включенного механизма оркестрации сервисов Istio.
Шлюз Kafka поставляется в виде собранного докер образа. Загрузка образа в среду исполнения и настройка его параметров для конкретной прикладной задачи производится в файлах конфигурации. Набор этих файлов и составляет прикладной дистрибутив шлюза.
Один прикладной дистрибутив (deployment) Шлюза Kafka может обеспечивать работу нескольких независимых интеграционных цепочек, при условии, что они не требуют установки взаимоисключающих значений параметров. В противном случае потребуется использовать для каждой интеграционной цепочки отдельный Deployment Шлюза Kafka с отдельным набором параметров.
Шлюз Kafka может обеспечивать взаимодействия типа Запрос-Ответ и Нотификация, как по инициативе внешней системы, так и по инициативе внутренних микросервисов.
Типовая упрощенная схема на рис. 1 показывает место Kafka шлюза в интеграционной цепочке.

Использование пользовательских метрик мониторинга#
Формирование метрик мониторинга осуществялется с помощью Spring Actuator.
Метрики в формате prometheus доступны по адресу: <ip>:<spring_boot_server_port>/actuator/prometheus
Помимо стандартных метрик Spring Boot приложения доступны следующие метрики:
Метрика |
Описание |
|---|---|
kafka_topic_received_messages_total |
Количество сообщений, полученных из topic(s) |
kafka_topic_sended_messages_total |
Количество сообщений, отправленных в topic(s) |
Управление listener сообщений (включение/отключение) из topic(s) Kafka в рантайме#
У каждого topic(s) есть параметры spring.kafka.consumer.topics.topic.enabled (по умолчанию true) и spring.kafka.consumer.common-enabled (по умолчанию true), которые отвечают за работу listener.
spring.kafka.consumer.topics.topic.enabled необходимо указывать в локальной config map шлюза (для каждого шлюза отдельно).
spring.kafka.consumer.common-enabled необходимо указывать в глобальной config map шлюза (одна конфигурация на весь namespace), пример такой конфигурации:
kind: ConfigMap
apiVersion: v1
metadata:
name: common-kafka-config
data:
common-kafka-config.yml: |-
spring:
kafka:
consumer:
common-enabled: false
При старте шлюза проверяется на соответствие, что оба параметра для listener в состоянии true, иначе listener не будет запущен.
Таблица соответствий включения listener:
Общий параметр common-enabled |
Параметр enabled на каждый topic(s) отдельно |
Состояние listener при старте |
|---|---|---|
true |
true |
включен |
false |
true |
выключен |
true |
false |
выключен |
false |
false |
выключен |
Чтобы отключить или включить listener в runtime достаточно задать любому из параметров значение false/true в config map адаптера.
При переведении любого из них в false - listener выключаются
При переведении любого из них в true - listener поднимаются
Управление механизмами повторной отправки сообщений в случае ошибок#
Транзакционный режим чтения сообщения из topic(s) кафка и отправки его по GRPC/REST#
В настройках адаптера необходимо указать spring.kafka.consumer.transactional=true.
Настройка транзакционного режима:
spring:
kafka:
consumer:
transactional: <true/false - если true, то в случае ошибки доставки сообщения фиксировать ли offset topic(s) кафки (в этом случае адаптер будет бесконечно пробовать читать недоставленные сообщения и отправлять по grpc/rest)>
repeat-after-error-ms: <в случае transactional=true, период повторной попытки вычитки и отправки сообщения при ошибке>
Тогда в случае ошибки при попытке отправить сообщение по GRPC/REST offset в topic(s) не будет закоммичен и спустя время, указанное в параметре repeat-after-error-ms будет произведена повторная попытка вычитать данное сообщение.
Повтор вызовов на уровне producer кафки при попытке отправить сообщения в topic(s)#
В настройках адаптера необходимо указать acks=1/all и выставить количество ретраев.
Настройки повторов на уровне producer:
spring:
kafka:
producer:
acks: 1 <поддерживаемые значения: 0 - не ждать ответа от брокеров (retries невозможны), 1 - ждать подтверждение записи от одного брокера лидера, all - ждать подтверждение записи от всех брокеров>
retries: 10 <количество повторных попыток>
Подключение адаптера к резервному кластеру Kafka#
При подключении адаптера к резервному кластеру Kafka:
1. Адаптер читает сразу из двух кластеров;
2. Адаптер записывает сообщения в основной кластер. В случае ошибки записи в основной кластер, пытается записать в резервный кластер и исключает из балансировки основной кластер на время, указанное в параметре `spring.kafka.producer.main-cluster-exclude-time-ms` (по умолчанию `60000мс`);
3. Соответсвенно необходимо задать следующие параметры: `spring.kafka.producer.main-cluster-exclude-time-ms`, `spring.kafka.producer.reserve-servers`, `spring.kafka.consumer.reserve-servers`.
Пример конфигурации:
kafka:
producer:
main-cluster-exclude-time-ms: 60000
bootstrap-servers: host.ru:9092
reserve-servers: host2.ru:9092
topic-default: NT.SYN.ESB.CLIENTDEDUPREQUESTED
consumer:
bootstrap-servers: host:9092
reserve-servers: host:9092
topics:
- topic: NT.COD.CLNT.CLIENTDEDUPLICATED
group-id: kafkaAdapter
Включение синхронного режима работы#
Для того, чтобы включить синхронный режим работы адаптера, при котором происходит не только доставка сообщения в topic(s), но и ожидается ответ из topic(s), необходимо:
Добавить в блок spring.kafka.sync-mode конфигурации адаптера следующие настройки:
spring:
kafka:
sync-mode:
enabled: true # если указать true, можно не отправлять заголовок x-synapse-sync:true
reply-topic: <reply-topic> # topic(s) ответа
timeout: 10
После, при вызове API адаптера (протоколы: REST/GRPC) сообщение будет отправляться в topic(s) producer и адаптер будет ожидать ответ из topic(s) по умолчанию, указанного в reply-topic. Время ожидания ответа регулируется параметром timeout в секундах. Если необходимо заменить topic(s) ответа, то при вызове API нужно передать заголовок x-synapse-reply-topic с другим названием topic(s). Отключить/включить синхронный режим можно также с помощью заголовка x-synapse-sync, передав в нем значения false/true соответственно.
При ответе на стороне принимающей системы должен быть скопирован заголовок запроса correlation-id для успешной корреляции запроса и ответа на стороне адаптера.
Маршрутизация сообщений с помощью адаптера по сервисам в зависимости от topic(s)#
Для протокола GRPC необходимо добавить в блок конфигурации адаптера следующие настройки:
# сначала определяем различные маршруты для GRPC клиента
grpc:
client:
settings:
default:
hostname: <service-name>
port: 6565
service1:
hostname: <service-name2>
port: 8888
service2:
hostname: <service-name3>
port: 4444
# затем задаем правила, по которым необходимо использовать данные маршруты
topics-routing:
routes:
<topic_name>: service1
<topic_name2>: service2
Приоритет выбора вызываемого сервиса, по убыванию:
1. Правило из `topics-routing`;
2. Значение заголовка `x-synapse-authority`;
3. Значение параметра grpc.client.settings.default.
Для протокола REST необходимо добавить в блок конфигурации адаптера следующие настройки:
# задаем правила, по которым необходимо вызывать REST сервисы
topics-routing:
routes:
<topic_name>: <rest_endpoint> # например "http://service-name:8080/endpoint" или "http://ingress-synapse-esbfs-nt.host.ru:80/endpoint"
<topic_name2>: <rest_endpoint2>
Приоритет выбора вызываемого сервиса, по убыванию:
1. Правило из topics-routing;
2. Значение заголовоков x-synapse-authority + x-synapse-rest-port + x-synapse-rest-path;
3. Значение параметра spring.kafka.rest.default-url.
Маршрутизация и заполнение заголовков на основе parsing(s) сообщения#
Доступен parsing(s) json сообщений с помощью jsonpath, а также parsing(s) xml сообщений с помощью xpath (определение инструмента parsing(s) производится автоматически).
Недосупно для batch режима.
Доступны следующие парсеры:
1. fromBody - заполнение переменной на основе parsing(s) тела сообщения;
2. fromHeader - заполенение переменной на основе заголовка, при этом регистр не учитывается (kafka заголовка или http/grpc заголовока);
3. fromConst - заполнение переменной константой.
Все значения переменных из блока variables будут добавлены в заголовки и переданы по rest/grpc или записаны в topic(s) kafka.
По умолчанию, всем заголовкам будет добавлен префикс x-synapse-.
Например: если в названии переменной указано operationname, тогда значение данной переменной будет добавленно в заголовок x-synapse-operationname; если в названии переменной указано rquid, тогда значение данной переменной будет добавленно в заголовок x-synapse-rquid.
Если не требуется добавлять данный префикс (или требуется передать системный заголовок), то в конфигурации требуется указать addPrefix: false.
Список системных заголовково доступен по ссылке: системные заголовки Kafka Spring API.
Пример для передачи системного заголовка kafka_messageKey, который определяет значение ключа сообщения в topic(s) Kafka:
routing:
api:
routeList:
- routeName: default
destinationExpression: ('topicName')
variables:
- name: kafka_messageKey
addPrefix: false
valueFrom:
- type: fromHeader
value: x-synapse-rquid
- type: fromBody
value: $.rquid
Пример маршрутизации сообщений в зависимости от заголовка (topic(s))#
Адаптер читает два topic(s) consumer_test и consumer_test2.
При чтении сообщения определяется routeName по значению заголовка kafka_receivedtopic, в данном заголовке передается topic(s), из которого было вычитано сообщение.
Затем в случае, если данное значение соответствует consumer_test, то сообщение отправляется по адресу http://host1:8080/.
Если значение соответствует consumer_test2, то сообщение отправляется по адресу http://host2:8080/.
Если значение не соответствует ни одному из определенных из routeList, оно отправляется по адресу http://default-host:8080.
Конфигурация в случае REST взаимодействия:
spring:
kafka:
rest:
mode: true
consumer:
topics:
- topic: consumer_test
group-id: kafkaAdapter_group
enabled: true
- topic: consumer_test2
group-id: kafkaAdapter_group
enabled: true
routing:
kafka:
routeName:
valueFrom:
- type: fromHeader
value: kafka_receivedtopic
routeList:
- routeName: consumer_test
destinationExpression: ('http://host1:8080/')
- routeName: consumer_test2
destinationExpression: ('http://host2:8080/')
- routeName: default
destinationExpression: ('http://default-host:8080/')
Конфигурация в случае GRPC взаимодействия:
grpc:
client:
settings:
default:
hostname: localhost
port: 8080
service1:
hostname: host1
port: 8080
service2:
hostname: host2
port: 8008
spring:
kafka:
rest:
mode: false
consumer:
topics:
- topic: consumer_test
group-id: kafkaAdapter_group
enabled: true
- topic: consumer_test2
group-id: kafkaAdapter_group
enabled: true
routing:
kafka:
routeName:
valueFrom:
- type: fromHeader
value: kafka_receivedtopic
routeList:
- routeName: consumer_test
destinationExpression: ('service1')
- routeName: consumer_test2
destinationExpression: ('service2')
- routeName: default
destinationExpression: ('default')
Пример маршрутизации сообщений в зависимости от тега внутри JSON сообщения#
При чтении сообщения определяется routeName по значению тега evtType JSON сообщения, на основе него выполняется маршрутизация.
Пример сообщения для маршрутизации по routeName: type1 и отправки по адресу http://service-create:
{
"evtType": "type1",
"operName": "createRq"
}
Пример сообщения для маршрутизации по routeName: type2 и отправки по адресу http://update-service:
{
"evtType": "type2",
"operName": "update"
}
Пример конфигурации:
spring:
kafka:
rest:
mode: true
consumer:
topics:
- topic: consumer_test
group-id: kafkaAdapter_group
enabled: true
- topic: consumer_test2
group-id: kafkaAdapter_group
enabled: true
routing:
kafka:
routeName:
valueFrom:
- type: fromBody
value: $.evtType
routeList:
- routeName: type1
destinationExpression: ('http://' + 'service-' + ServiceName.substring(0,ServiceName.length()-2))
variables:
- name: ServiceName
valueFrom:
- type: fromBody
value: $.operName
- routeName: type2
destinationExpression: ('http://' + variables['ServiceName'] + '-service')
variables:
- name: ServiceName
valueFrom:
- type: fromBody
value: $.operName
- routeName: default
destinationExpression: ('http://default-host:8080/')
Пример маршрутизации сообщений в зависимости от тега внутри XML сообщения#
При чтении сообщения определяется routeName по значению тега evtType XML сообщения, на основе него выполняется маршрутизация.
Пример сообщения для маршрутизации по routeName: type1 и отправки по адресу http://service-create:
<xml>
<evtType>type1</evtType>
<operName>createRq</operName>
</xml>
Пример сообщения для маршрутизации по routeName: type2 и отправки по адресу http://update-service:
<xml>
<evtType>type2</evtType>
<operName>update</operName>
</xml>
Пример конфигурации:
spring:
kafka:
rest:
mode: true
consumer:
topics:
- topic: consumer_test
group-id: kafkaAdapter_group
enabled: true
- topic: consumer_test2
group-id: kafkaAdapter_group
enabled: true
routing:
kafka:
routeName:
valueFrom:
- type: fromBody
value: //evtType
routeList:
- routeName: type1
destinationExpression: ('http://' + 'service-' + ServiceName.substring(0,ServiceName.length()-2))
variables:
- name: ServiceName
valueFrom:
- type: fromBody
value: //operName
- routeName: type2
destinationExpression: ('http://' + variables['ServiceName'] + '-service')
variables:
- name: ServiceName
valueFrom:
- type: fromBody
value: //operName
- routeName: default
destinationExpression: ('http://default-host:8080/')
Исключение заголовков при чтении из topic(s) Kafka#
Доступен функционал выборочного пропуска заголовков, которые не нужны для дальнейшей обработки сообщения или мешают обработке, а так же для пропуска всех заголовков, за исключением rquid и operationname.
Для выборочного исключения необходимо добавить в конфигурации адаптера в параметр tracing.excludeHeaders.kafka список заголовков.
Пример конфигации с частичным исключении заголовков:
tracing:
excludeHeaders:
kafka:
- HEADER-TO-EXCLUDE1
- ...
- header-to-excludeN
Название заголовка вводится полностью. Обратите внимание на то, что заголовки регистрозависимы.
Для пропуска всех заголовков, кроме rquid и operationname, необходимо добавить в конфигурации адаптера в параметр tracing.excludeHeaders.kafka ключевое слово all.
При пропуске всех заголовков не будет доступен механизм фильтрации сообщений.
Пример полного исключения заголовков:
tracing:
excludeHeaders:
kafka:
- all
Исключение заголовков при вызове адаптера по REST/gRPC#
Доступен функционал выборочного пропуска заголовков, которые не нужны для дальнейшей обработки сообщения или мешают обработке, а так же для пропуска всех заголовков.
Для выборочного исключения необходимо добавить в конфигурации адаптера в параметр tracing.excludeHeaders.api список заголовков.
Пример конфигации с частичным исключении заголовков:
tracing:
excludeHeaders:
api:
- HEADER-TO-EXCLUDE1
- ...
- header-to-excludeN
Название заголовка вводится полностью. Обратите внимание на то, что заголовки регистрозависимы только для протокола gRPC. В случае REST-запроса контейнер сервлетов переводит заголовки в нижний регистр, поэтому сравнение выполняется без учета регистра.
Для пропуска всех заголовков необходимо добавить в конфигурации адаптера в параметр tracing.excludeHeaders.api ключевое слово all.
Пример полного исключения заголовков:
tracing:
excludeHeaders:
api:
- all
Отправка пачки сообщений (batch) в topic(s) Kafka#
Отправка пачки сообщений (batch) в topic(s) Kafka доступна только по REST API, контроллер /_bulk.
Первоначально строка сообщений, полученная по данному контроллеру, разделяется по параметру spring.kafka.producer.separator (по умолчанию \n).
Затем сообщения фильтруются. Если сообщение содержит хотя бы одно вхождение из списка spring.kafka.producer.skipList, то оно пропускается.
Если сообщение содержит тег agent, то оно идентифицируется, как сообщение от filebeat и в нем передается только блок message. Если сообщение не содержит тег agent, то оно идентифицируется, как сообщение от fluentbit и в нем исключется тег @timestamp
Если список skipList пуст, то сообщения не фильтруются и не модифицируются по типу filebeat/fluentbit.
Пример конфигурации:
server:
tomcat:
max-swallow-size: 40MB
threads:
max: 200
min-spare: 200
spring:
kafka:
rest:
mode: true
producer:
batch:
separator: \n
skipList:
- "\"_type\""
- "\"_index\""
properties:
acks: 1
linger.ms: 500
batch.size: 524288
buffer.memory: 104857600
compression.type: gzip
max.request.size: 52428800
Пример конфигурации агента filebeat: filebeat-bulk-api.yaml
Настройка регистрации событий аудита#
Шлюз Kafka позволяет фиксировать события аудита, для этого необходимо в конфигурации шлюза задать параметр audit.enabled равным true, а также указать параметры подключения к аудиту, пример:
audit:
enabled: true
service:
host: <адрес сервера аудита>
metamodel-route: <endpoint для регистрации метамодели аудита>
event-route: <endpoint для фиксации событий аудита>
metamodel:
version: <версия метамодели>
module-id: KFGT # идентификатор модуля
Параметры события
Параметр |
Описание |
|---|---|
Дата и время события |
Время события |
Идентификатор пользователя |
Всегда NO-USER |
Идентификатор компонента |
Всегда KFGT |
Наименование события |
Описание события |
Результат |
Успешный или неуспешный результат события |
Старое и новое значение (для событий изменения объекта) |
Значение параметров события с указанием старых и актуальных значений |
IP-адрес и hostname |
Значения IP и hostname системного компонента, являющегося источником события |
Чтобы отправить событие аудита в Единый Аудит (COTE) Platform V Monitor, в нем предварительно должна быть зарегистрирована метамодель события.
Шлюз формирует метамодель в формате JSON:
{
"metamodelVersion": "1",
"module": "KFGT",
"events": [
{
"name": "KAFKA_GW_CHANGE_CONFIG",
"description": "Изменение конфигурации шлюза Kafka",
"success": true,
"mode": "reliability",
"params": [],
"changedParams": [
{
"name": "spring.kafka.consumer",
"description": "Параметры consumer шлюза Kafka"
}
]
},
{
"name": "KAFKA_GW_AUTHENTICATION_ERROR",
"description": "Ошибка аутентификации к брокерам Kafka",
"success": false,
"mode": "reliability",
"params": [
{
"name": "BootStrap Servers",
"description": "BootStrap Servers Kafka"
}
],
"changedParams": []
},
{
"name": "KAFKA_GW_AUTHORIZATION_ERROR",
"description": "Ошибка авторизация к topic(s) Kafka",
"success": false,
"mode": "reliability",
"params": [
{
"name": "BootStrap Servers",
"description": "BootStrap Servers Kafka"
},
{
"name": "Topic",
"description": "topic(s) Kafka"
}
],
"changedParams": []
},
{
"name": "KAFKA_GW_VALUE_TOO_LARGE_ERROR",
"description": "Поле фиксируемого события превысило 32 Кб",
"success": false,
"mode": "reliability",
"params": [],
"changedParams": []
},
{
"name": "KAFKA_GW_PAYLOAD_TOO_LARGE_ERROR",
"description": "Сообщение фиксируемого события превысило 2 Мб",
"success": false,
"mode": "reliability",
"params": [],
"changedParams": []
},
{
"name": "KAFKA_GW_AUTHENTICATION_SUCCESS",
"description": "Успешная аутентификация к брокерам Kafka",
"success": true,
"mode": "reliability",
"params": [
{
"name": "BootStrap Servers",
"description": "BootStrap Servers Kafka"
}
],
"changedParams": []
},
{
"name": "KAFKA_GW_AUTHORIZATION_SUCCESS",
"description": "Успешная авторизация к topic(s) Kafka",
"success": true,
"mode": "reliability",
"params": [
{
"name": "BootStrap Servers",
"description": "BootStrap Servers Kafka"
},
{
"name": "Topic",
"description": "topic(s) Kafka"
}
],
"changedParams": []
},
{
"name": "KAFKA_GW_AUDIT_INIT",
"description": "Успешная инициализация отправки событий в Аудит",
"success": true,
"mode": "reliability",
"params": [
{
"name": "BootStrap Servers",
"description": "BootStrap Servers Kafka"
}
],
"changedParams": []
}
]
}
При старте шлюз подключится к endpoint АС Аудит, зарегистрирует метамодель и начнет отправку событий.
Одна и та же версия метамодели может регистрироваться произвольное количество раз, но при этом описание метамодели не должно меняться. Поэтому версия метамодели вместе с самой метамоделью определена в коде шлюза и меняется только при перепоставке.
Это гарантирует, что шлюзы одной и той же версии установленные в разных проектах всегда будут оправлять консистентную модель.
При возникновении ошибок отправки, шлюз сохраняет событие в локальном буфере в памяти приложения и пытается отправить его повторно. Количество попыток настраивается в параметре audit.event.retry.max-attempts. Количество событий, которое может быть сохранено в буфере настраивается в параметре audit.http.maxBufferSize.
Если возникает ошибка при регистрации модели, то шлюз не будет запущен.
Настройка часового пояса#
Для того чтобы установить часовой пояс логов приложения, необходимо добавить переменную TZ в Deployment со значением часового пояса, например:
env:
- name: TZ
value: Europe/Moscow
Настройка работы consumer в режиме batch (чтение пачки сообщений)#
Для того, чтобы адаптер считывал сообщения из топика пачками необходимо включить параметр spring.kafka.consumer.batching=true.
Тогда сообщения будут передаваться в следующем формате:
Если сообщения в топике в формате JSON, где
message- сообщение, прочитанное из топика,headers- список заголовков, указаных в топике:
{
"batchMassages": [{
"message": <json message>,
"headers": {
"key1": "value1"
}
}
]
}
Например:
{
"batchMassages": [{
"message": {
"test1": "test1"
},
"headers": {
"key1": "value1",
"key2": "value2"
}
}, {
"message": {
"test2": "test2"
},
"headers": {
"key1": "value1",
"key2": "value2"
}
}
]
}
Если сообщения в топике в формате XML, где Body - сообщение, прочитанное из топика, Headers - список заголовков, указаных в топике:
<BatchMessages>
<Message>
<Body>
<xml message>
</Body>
<Headers>
<key1>value1</key1>
</Headers>
</Message>
</BatchMessages>
Например:
<BatchMessages>
<Message>
<Body>
<Rq1>
<RqUID>2</RqUID>
<Tag>value</Tag>
</Rq1>
</Body>
<Headers>
<key1>value1</key1>
<key2>value2</key2>
</Headers>
</Message>
<Message>
<Body>
<Rq2>
<RqUID>2</RqUID>
<Tag>value</Tag>
</Rq2>
</Body>
<Headers>
<key1>value1</key1>
<key2>value2</key2>
</Headers>
</Message>
</BatchMessages>
С помощью параметра max.poll.records в настройках kafka consumer properties можно ограничить максимальное значение одноврменно прочитанных сообщений.
Часто встречающиеся проблемы и пути их устранения#
Проблема |
Причина |
Решение |
|---|---|---|
Не стартует Pod приложения |
Недостаточно ресурсов |
Увеличить запрашиваемые ресурсы для приложения |
Не стартует Pod приложения |
Нет доступной node(s) для запуска |
Зарегистрировать обращение в поддержку инфраструктуры |
Не стартует Pod приложения |
Ошибка в конфигурации |
Выгрузить лог, провести анализ, скорректировать конфигурацию |
Частый рестарт контейнера приложения |
Медленная загрузка приложения |
Увеличить задержку и/или интервал опроса Liveness пробы |
Частый рестарт контейнера приложения |
Недостаточно ресурсов |
Увеличить запрашиваемые ресурсы для приложения |
Ошибка при вызове по gRPC |
Нет доступных подов сервиса, которому направлен вызов |
Проверить состояние сервиса - получателя вызова |
Ошибка при вызове по gRPC |
Ошибки в конфигурации сервисного прокси |
Выгрузить лог сервисного прокси, проанализировать, устранить ошибки |
Ошибка при вызове по Kafka |
Отсутствует topic(s), к которому требуется подключиться |
Создать topic(s) в Kafka |
Ошибка при вызове по Kafka |
Ошибки в конфигурации сервисного прокси |
Выгрузить лог сервисного прокси, проанализировать, устранить ошибки |
Ошибка при вызове по Kafka |
Параметры подключения к Kafka некорректны |
Сопоставить параметры config map с параметрами Apache Kafka, к которому требуется подключиться и устранить ошибки |