Руководство по работе с сервисом DataSpace Subscription#
О документе#
В составе продукта Platform V DataSpace (APT) поставляется сервис DataSpace Subscription, реализующий универсальный (повторно используемый разными потребителями) механизм, позволяющий DataSpace участвовать в событийном обмене (выступать в роли провайдера событий).
В данном документе приведено описание настройки сервиса DataSpace Subscription и работы с ним.
Расшифровку основных понятий см. в перечне "Терминов и определений".
Требования к модели#
В модели данных необходимо описать события (event), которые будут обрабатываться при работе сервиса DataSpace Subscription.
Событие может выступать как в роли самостоятельного агрегата, так и являться частью одного из существующих агрегатов в модели, для этого у события должна быть объявлена ссылка на родительский класс (parent="true"). Событие, у которого отсутствует родительская ссылка, является самостоятельным агрегатом.
Отличия Event от класса:
для него определена только операция создания;
служит только для подписки на него;
не наследуется и имеет плоскую структуру;
допустима ссылка на родительский класс для формирования агрегатной связи.
Event, у которого отсутствует родительская ссылка, является самостоятельным агрегатом;обработанный всеми подписками
Eventавтоматически удаляется по истечении времени;не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.
Пример описания события, являющегося самостоятельным агрегатом:
<event name="MyEvent">
<property name="message" type="String"/>
</event>
Пример события, которое входит в агрегат Product:
<model>
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
</class>
<event name="ProductEvent">
<property name="message" type="String"/>
<property name="product" type="Product" parent="true"/>
</event>
</model>
Отдельным видом событий являются события слияния/разъединения. В модели они обозначаются признаком merge-event="true". Для
событий слияния/разъединения обязательно наличие ссылки на родительский класс:
<model>
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<property name="oneToOneLink" type="One2OneClassLink" mappedBy="product" label="o2o связь"/>
<property name="services" type="Service" collection="set" mappedBy="product" label="Коллекция сервисов"/>
</class>
<class name="Service" label="Сервис" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<property name="product" type="Product" label="продукт" parent="true" index="true"/>
</class>
<class name="One2OneClassLink" label="Класс для связи o2o">
<property name="name" type="String" label="Наименование"/>
<property name="product" type="ProductParty" parent="true"/>
</class>
<event name="ProductClientMergeEvent" merge-event="true">
<reference name="client" type="Client" label="Идентификатор клиента"/>
<property name="product" type="Product" parent="true"/>
</event>
</model>
Для свойств событий допустимо указание тегов: name, type, parent, label, length, precision, mandatory, default-value.
События слияния/разъединения создаются автоматически сервисом DataSpace ReferenceUpdater.
Спецификация подписок#
Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml. При сборке проекта файл попадает в конфигурацию DataSpace Subscription. При старте сервис DataSpace Subscription читает этот файл и загружает подписки в таблицу T_DSPC_SYS_SUBSCRIPTION.
Пример:
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="mergeSubscription1"
name="EPKMergeForProduct"
description="Публикация события с слиянии во внешнюю систему"
target="REST"
eventType="ProductClientMergeEvent"
callback="${my.merge.subscription.1.url}/api/v1/products"
validTill="20240101T00:00:00Z"
maxRetryAttempts="${my.merge.subscription.1.nretry}"
timeoutMs="${my.merge.subscription.1.timeout}"
retryDelayMs="${my.merge.subscription.1.retrydelay}"
async="false"
blocking="true">
<criteria>
root.fromClient=='2'
</criteria>
<query>
{
searchProduct(cond: "it.$id==''${product}''") {
elems {
code
name
services {
elems {
code
name
}
}
oneToOneLink {
name
}
}
}
}
</query>
<template>
[
{
"operation": "shift",
"spec": {
"data": {
"searchProduct": {
"elems": {
"*": {
"code": "ProductCodeMappedTo",
"oneToOneLink": {
"name": "oneToOneLinkNameMappedTo"
},
"services": {
"elems": {
"*": {
"code": "servicesMappedTo"
}
}
}
}
}
}
}
}
}
]
</template>
<headers>
-XTenantId=${my.merge.subscription.1.tenantId}
</headers>
</subscription>
</subscriptions>
Описание полей:
id— уникальный идентификатор подписки, используется в качестве primary key в таблице подписокT_DSPC_SYS_SUBSCRIPTION.Внимание!
Значение "0" зарезервировано системой.
name— наименование подписки.description— текстовое описание подписки.target— способ публикации (REST,KAFKA,GRAPHQLSUBS).eventType— тип событий (entity name), на которые осуществляется подписка.callback— URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_топика> в случае Kafka.validTill— время окончания действия подписки.maxRetryAttempts— количество повторных попыток публикации (актуально только для target == REST).timeoutMs— тайм-аут публикации, в миллисекундах (актуально только для target == REST).retryDelayMs— задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST).async— использование асинхронной отправки (true/false).blocking— приостановление отправки при первой ошибке (true/false).criteria— строковое выражение для фильтрации записей события. С описанием формата строковых выражений можно ознакомиться в документе "Строковые выражения".query— текст GraphQL-запроса для чтения дополнительных данных. С GraphQL-API можно ознакомиться в документе "Протокол GraphQL компонента DataSpace Core". Текст запроса может содержать placeholder формата ${<наименование_свойства_события>}. Перед выполнением GraphQL-запроса, placeholder будут заменены на соответствующие значения свойств события. <наименование_свойства_события> — наименование свойства JPA-класса события entity,nameкоторого прописан в полеeventTypeу конкретного subscription в subscriptions.xml.template— спецификация трансформации JSON-данных в JSON-сообщение. Для трансформации по умолчанию используется библиотека JOLT, поэтому для описания преобразования в параметре template используется формат спецификации этой библиотеки. Исходные данные для трансформации по шаблону имеют общий вид:{ "event": { <атрибуты события> }, "data": { <результат выполнения graphql-запроса. Если graphql-запрос в подписке не задан или не вернул данных - элемент пустой> } }headers— спецификация дополнительных заголовков сообщения. Формат: <наименование_заголовка>=<значение>. <значение> может содержать Placeholder формата ${<наименование_свойства_события>}, которые будут заменены аналогично логике, описанной для поляquery.
Примечание
На старте сервиса DataSpace Subscription проводится валидация полей
criteria,query,template. Если валидация не проходит, сервис завершает свою работу.
Важно
Поля
criteria,query,templateиheadersне могут содержать пустое значение. Если не требуется задавать значение для этих полей, то не нужно указывать соответствующие теги.
Стендозависимая конфигурация subscriptions.xml#
Потребитель может использовать в значениях полей target, callback, template, headers, maxRetryAttempts, timeoutMs, retryDelayMs Placeholder, которые будут заполняться при загрузке модуля из конфигурации приложения, которая в свою очередь может быть параметризована и сделана стендозависимой.
Для этого необходимо добавить в файл customer-properties.subscription.yaml список параметров со значениями. В качестве значения может быть использован также параметр развертывания, например, {{tenantId}}. Эти параметры должны быть перечислены также в файле custom_property.conf.yml, могут быть как со значениями по умолчанию, так и без значений.
При установке на стенд этим параметрам должны быть заданы значения в стендозависимой конфигурации.
Подробнее см. в документе "Руководство по установке" в следующих разделах:
"Добавление пользовательских параметров в дистрибутив".
"Использование пользовательских параметров в рамках сервиса DataSpace".
Пример customer-properties.subscription.yaml:
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
my.merge.subscription.1.url={{ customer.value1 }}
my.merge.subscription.1.nretry=1
my.merge.subscription.1.timeout={{ customer.value2 }}
my.merge.subscription.1.retrydelay=2000
my.merge.subscription.1.tenantId={{ customer.value3 }}
Пример custom_property.conf.yml:
customer.value1=http://localhost:8082
customer.value2=10000
customer.value3=tenantId
# ...
Гарантия последовательности публикации событий
Для обеспечения гарантии последовательности отправки событий по агрегату необходимо для подписки:
выключить async-режим;
включить blocking-режим.
Процессы и масштабирование#
Основную логику сервиса DataSpace Subscription реализуют процессы EVENT_TRANSFER и EVENT_WORKER.
Задача процесса EVENT_TRANSFER состоит в том, чтобы для каждого события, по которому есть валидные подписки, сформировать записи в таблице очереди на публикацию T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего событие считается обработанным. При этом для всех выбранных новых Event (для которых нашлись валидные подписки и были созданы записи в T_DSPC_SYS_EVENT_QUEUE_ITEM, а также для которых подписок не нашлось) статус меняется на PROCESSED.
Процесс EVENT_WORKER обрабатывает таблицу очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM: читает соответствующее событие с применением условия criteria подписки, выполняет запрос дополнительных данных, используя query, трансформирует данные в итоговое сообщение по шаблону template и осуществляет публикацию сообщения на основании target и callback.
Предусмотрена возможность масштабирования процессов EVENT_TRANSFER и EVENT_WORKER. За количество процессов, которые будут запущены, отвечают настройки dataspace.subscription.event-transfer.transfer-processes-count и dataspace.subscription.event-worker.worker-processes-count соответственно. Реализована возможность распределения процессов по партициям для того, чтобы процессы обрабатывали каждый свой набор данных.
За количество партиций для процессов EVENT_TRANSFERи EVENT_WORKER отвечают настройки dataspace.subscription.event-transfer.partitions-number и dataspace.subscription.event-worker.partitions-number соответственно. Партиции ведутся в разрезе подписок. Для процессов EVENT_TRANSFER зарезервирован идентификатор подписки со значением "0".
Внимание!
Настройки количества партиций для процессов
EVENT_TRANSFERиEVENT_WORKERдолжны иметь одинаковые значение для всех экземпляров сервиса DataSpace Subscription, работающих с одной БД. При рассогласовании значений данных настроек между разными экземплярами процессы будут брать в работу один и тот же набор данных, что приведет к нарушению порядка обработки событий.Также запрещено менять данные настройки в runtime.
Служебная таблица T_DSPC_SYS_SUBSCRIPTION_WORKER содержит информацию о партициях в разрезе подписок. Недостающие партиции добавляет служебный процесс на старте сервиса. Процессы EVENT_TRANSFER и EVENT_WORKER в процессе своей работы проверяют таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER на предмет необрабатываемых партиций и берут их в работу, прописывая свой идентификатор в поле PROCESSID. Партиция считается необрабатываемой, если PROCESSID не установлен, либо ее heartbeat не обновлялся определенное время, которое задается в настройках dataspace.subscription.event-transfer.heartbeat-timeout-sec и dataspace.subscription.event-worker.heartbeat-timeout-sec для процессов EVENT_TRANSFER и EVENT_WORKER соответственно.
Набор данных разбивается по партициям при помощи предварительно рассчитанного хеша, который хранится в поле PARTITIONHASH в таблицах
событий и очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM. При обработке записей процессы EVENT_TRANSFER и EVENT_WORKER вычисляют партицию, к которой принадлежит запись по формуле PARTITIONHASH mod <количество_партиций>. Если полученная партиция присутствует среди партиций, назначенных процессу, то он выполняет обработку записи.
Таблица T_DSPC_SYS_SUBSCRIPTION_PROCESS также способствует управлению масштабированием процессов. При старте сервиса процессы регистрируются в данной таблице, записывая свой идентификатор в поле OBJECT_ID. В процессе работы процессы пересчитывают значение поля MAXPARTITIONS, которое содержит максимальное количество партиций, обрабатываемых процессом, по следующей формуле: <количество_подписок> * <количество_партиций> / <количество_процессов>. Если процессу назначено больше партиций, чем MAXPARTITIONS, то он освободит нужное количество.
Служебные таблицы#
T_DSPC_SYS_SUBSCRIPTION
Таблица подписок. Данные загружаются из файла subscriptions.xml при старте сервиса.
Поле |
Описание |
|---|---|
OBJECT_ID |
идентификатор подписки, задает пользователь (значение "0" зарезервировано системой) |
NAME |
имя подписки (для отображения, включая labels мониторинга) |
DESCRIPTION |
описание подписки |
EVENTTYPE |
entity name событий, на которые осуществляется подписка |
CRITERIA |
строковое выражение для фильтрации записей события |
QUERY |
текст GraphQL-запроса для чтения дополнительных данных |
TEMPLATE |
спецификация трансформации JSON-данных в JSON-сообщение |
TARGET |
способ публикации (REST, KAFKA, GRAPHQLSUBS) |
CALLBACK |
URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_топика> в случае Kafka |
HEADERS |
спецификация дополнительных заголовков сообщения |
ASYNC |
признак использования асинхронной отправки (true/false) |
BLOCKING |
признак приостанавливания отправки при первой ошибке (true/false) |
MAXRETRYATTEMPTS |
количество повторных попыток публикации (актуально только для target == REST) |
RETRYDELAYMS |
задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST) |
TIMEOUTMS |
тайм-аут публикации, в миллисекундах (актуально только для target == REST) |
VALIDTILL |
время окончания действия подписки |
SYS_OWNERID |
владелец подписки (ID тенанта) |
SYS_LASTCHANGEDATE |
дата последнего изменения записи |
T_DSPC_SYS_EVENT_QUEUE_ITEM
Таблица очереди обработки событий. Заполняется процессами EVENT_TRANSFER. Обрабатывается процессами EVENT_WORKER.
Поле |
Описание |
|---|---|
OBJECTID_EVENTID |
ссылка на событие (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
ссылка на подписку (часть первичного ключа) |
EVENTTYPE |
entity name события |
STATUS |
статус обработки элемента очереди событий (NEW, SENT, ERROR, SKIP) |
AGGREGATEROOTTYPE |
тип корня агрегата Event, определяется по метаданным |
AGGREGATEROOTID |
идентификатор корня агрегата события |
PARTITIONHASH |
предварительно рассчитанный хеш для деления по партициям |
CREATIONTIMESTAMP |
время добавления события в очередь |
SYS_LASTCHANGEDATE |
дата последнего изменения записи |
Значения поля STATUS:
NEW— новый элемент очереди событий, еще не был обработан;SENT— элемент очереди событий успешно обработан, сообщение отправлено;ERROR— произошла ошибка при обработке, элемент будет обработан повторно;SKIP— подходящее событие не найдено в процессе обработки элемента очереди событий (скорее всего событие не прошло по условию, указанному в поле criteria подписки).
T_DSPC_SYS_SUBSCRIPTION_WORKER
Поле |
Описание |
|---|---|
OBJECTID_PARTITIONNUMBER |
номер партиции (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
ссылка на подписку (часть первичного ключа) |
HEARTBEAT |
время последнего обновления |
PROCESSID |
идентификатор процесса |
STATE |
состояние обработки партиции (ACTIVE, CIRCUIT BREAKING, UNASSIGNED) |
T_DSPC_SYS_SUBSCRIPTION_PROCESS
Поле |
Описание |
|---|---|
OBJECT_ID |
идентификатор процесса (первичный ключ) |
MAXPARTITIONS |
максимальное количество партиций, обрабатываемых процессом |
KIND |
вид процесса (worker / перекладчик) (EVENT_WORKER, EVENT_TRANSFER) |
HEARTBEAT |
время последнего обновления |
Конфигурация#
dataspace.subscription.event-transfer.transfer-processes-count
#Значение по умолчанию: 1
#Количество процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы перекладчика
dataspace.subscription.event-transfer.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс перекладчика
dataspace.subscription.event-transfer.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс перекладчика будет считаться неактивным
dataspace.subscription.event-transfer.max-transfer-count
#Значение по умолчанию: 1000
#Количество событий, обрабатываемых перекладчиком за один цикл работы
dataspace.subscription.event-worker.worker-processes-count
#Значение по умолчанию: 1
#Количество процессов воркера (EVENT_WORKER)
dataspace.subscription.event-worker.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы воркера
dataspace.subscription.event-worker.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов воркера (EVENT_WORKER)
dataspace.subscription.event-worker.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс воркера
dataspace.subscription.event-worker.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс воркера будет считаться неактивным
dataspace.subscription.event-worker.circuit-breaker-error-count-threshold
#Значение по умолчанию: 10
#Количество ошибок по подписке, по достижении которого включается circuit breaker на эту подписку
dataspace.subscription.event-worker.circuit-breaker-timeout-ms
#Значение по умолчанию: 30000 миллисекунд
#Время с момента включения circuit breaker на подписку, по истечении которого circuit breaker будет выключен
dataspace.subscription.service-tasks.pool-size
#Значение по умолчанию: 5
#Количество потоков в пуле служебных процессов
dataspace.subscription.service-tasks.delete-dead-processes-schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который удаляет неактивные процессы EVENT_TRANSFER и
#EVENT_WORKER из таблицы T_DSPC_SYS_SUBSCRIPTION_PROCESS
dataspace.subscription.service-tasks.load-subscriptions-schedule-delay-ms
#Значение по умолчанию: 1000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который выполняет загрузку подписок из файла subscriptions.xml
#в таблицу T_DSPC_SYS_SUBSCRIPTION, а также вставку недостающих партиций в таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER.
#После успешной загрузки процесс перестает запускаться
dataspace.subscription.service-tasks.cleanup-depth-hr
#Значение по умолчанию: 48 часов
#Максимальное время хранения записей в таблицах событий и таблице очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего
#записи будут удалены фоновым процессом, если они уже были обработаны: status == PROCESSED для событий
#и status == SENT для записей T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.cleanup-cron
#Значение по умолчанию: 0 0 0 * * * (каждый день в 00:00)
#Расписание запуска служебного процесса, который удаляет отработанные старые записи из таблиц событий и
#таблицы очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.tasks-schedule-initial-delay-ms
#Значение по умолчанию: 90000 миллисекунд
#Задержка перед запуском всех фоновых процессов на старте сервиса. Применяется для обеспечения корректной установки health-метрики
#readiness на старте сервиса.
Если предполагается использование функциональности публикации событий в Kafka, то необходимо задать настройки для каждого кластера Kafka в файле customer-properties.subscription.yaml в следующем виде:
Примечание
В случае использовании Helm-инсталляции в рамках раздела "Ручная установка сервиса" документа "Руководство по установке" необходимо создать
kind: Secretв среде контейнеризации с параметрами, указанными ниже, и задать имя полученного секрета в параметр в Values.yml:appConfig: subscription: config: "cm-dspc-subscriptionstub"
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.bootstrap-servers=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.acks=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.delivery-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.request-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.retry-backoff-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.linger-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotence=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.max-in-flight-requests-per-connection=
#Если требуется SSL:
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-endpoint-identification-algorithm=
При необходимости параметры можно сделать стендозависимыми по аналогии со стендозависимой конфигурацией subscriptions.xml (см. раздел "Стендозависимая конфигурация subscriptions.xml").
Пример customer-properties.subscription.yaml:
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.bootstrap-servers={{ local.kafka.bootstrap.servers }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.acks={{ local.kafka.acks }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.delivery-timeout-ms={{ local.kafka.delivery.timeout.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.request-timeout-ms=200
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.retry-backoff-ms=500
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.linger-ms={{ local.kafka.linger.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.enable-idempotence={{ local.kafka.enable.idempotence }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.max-in-flight-requests-per-connection={{ local.kafka.max.in.flight.requests.per.connection }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-location=/deployments/credentials/kafkaSslKeystore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-password=12456
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-location=/deployments/credentials/kafkaSslTrustwtore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-password=54321
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-endpoint-identification-algorithm=
Пример custom_property.conf.yml:
local.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
local.kafka.acks=all
local.kafka.delivery.timeout.ms=5000
local.kafka.linger.ms=0
local.kafka.enable.idempotence=true
local.kafka.max.in.flight.requests.per.connection=5
# ...
Важно
При использовании SSL, установка значения true для параметра
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotenceможет приводить к ошибкеorg.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. Для того чтобы этого избежать, необходимо корректным образом настроить операциюIdempotentWriteдля кластера. Подробнее см. в официальной документации Kafka
В файле subscriptions.xml target необходимо задавать в формате <наименование_кластера>:<наименование_топика>.
Пример: LOCAL_KAFKA:test-topic.
Retry#
Параметры retry/timeout для Kafka задаются в настройках Kafka на конкретный кластер <наименование_кластера>. Для этого необходимо использовать параметры delivery-timeout-ms, request-timeout-ms, retry-backoff-ms.
Параметры retry/timeout для REST берутся из параметров подписки: maxRetryAttempts, timeoutMs, retryDelayMs. Для ошибок с http status code 4xx retry не выполняется (то есть retry — только для 5xx и в случае тайм-аута).
Для идемпотентности потребитель может определить необходимый заголовок (передавая какое-либо из полей event с уникальным значением).
Circuit breaking#
На уровне параметра подписки blocking вводится настройка режима обработки ошибки:
blocking=true— при первой ошибке запрет на отправку остальных публикаций по партиции подписки.blocking=false— разрешена публикация других событий по подписке до наступления условия приостановки.
Поведение при ошибках:
Производится фиксация ошибки. Статус "ERROR" устанавливается для элемента очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM.
Если параметр
blocking=false, то продолжается обработка следующих сообщений, при этом счетчик ошибок по подписке увеличивается.Если параметр
blocking=true, то полностью приостанавливается отправка по этой партиции, при этом счетчик ошибок по подписке увеличивается.При накоплении порогового значения ошибок по подписке (настройка
dataspace.subscription.event-worker.circuit-breaker-error-count-thresholdравна, например, "10") приостанавливается отправка по этой подписке. При этом включается circuit breaker на всю подписку на один экземпляр приложения.Спустя время, указанное в настройке
dataspace.subscription.event-worker.circuit-breaker-timeout-ms, производится попытка повторной отправки, начиная с тех, чья отправка завершилась с ошибкой.Успешная отправка сбрасывает счетчик ошибок.
Примечание
Каждый процесс EVENT_WORKER самостоятельно принимает решение о приостановке и возобновлении обработки партиции. Для того чтобы не останавливать весь поток при ошибке в данных одной из публикаций приостановится только одна партиция.
Мониторинг#
Мониторинг включается настройкой:
dataspace.subscription.metrics.enabled
#Значение по умолчанию: false — метрики выключены
#true — метрики включены
Реализованы следующие метрики мониторинга:
Наименование метрики |
Тип метрики |
Теги |
Описание |
|---|---|---|---|
dspc.subscription.handle.time |
Timer |
|
Время обработки событий в разрезе подписки и результата отправки |
dspc.subscription.publication.time |
Timer |
|
Время отправки сообщений в разрезе подписки и результата отправки |
dspc.subscription.errors.count |
Counter |
|
Количество ошибок отправки событий в разрезе подписки |
dspc.subscription.skip.count |
Counter |
|
Количество отсеянных событий в разрезе подписки |
dspc.subscription.transfer.count |
Counter |
Скорость перекладки событий |
|
dspc.subscription.partition.state |
Gauge |
|
Статус обработки партиций — количество по T_DSPC_SYS_SUBSCRIPTION_WORKER в разрезе state и subscriptionId |
dspc.subscription.event.queue.size |
Gauge |
|
Количество элементов очереди обработки событий в таблице T_DSPC_SYS_EVENT_QUEUE_ITEM в разрезе подписок и статусов. |
dspc.subscription.new.events.count |
Gauge |
|
Отображает количество событий со статусом |
Метрики dspc.subscription.partition.state, dspc.subscription.event.queue.size и dspc.subscription.new.events.count обновляются фоновым scheduled-процессом. Частота обновления
данных метрик регулируется настройкой:
dataspace.subscription.metrics.schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
Клиентский путь#
Порядок действий:
Определить список событий для публикации.
Добавить в проекте DataSpace в модель данных model.xml соответствующие этим событиям Event с необходимым для публикации набором атрибутов.
Создать в проекте DataSpace конфигурацию подписок subscriptions.xml, предусмотрев необходимые параметры для стендозависимой параметризации.
В конфигурации подписок описать шаблон трансформации данных события (и результата выполнения
query) в JSON для публикации.Сформировать файлы
customer-properties.subscription.yamlиcustom_property.conf.yml, тем самым определив стендозависимую конфигурацию.Выполнить сборку проекта DataSpace.
Доработать приложение для создания событий в необходимых узлах бизнес-процесса (вместе с изменением бизнес-данных или независимо).
Подготовить интеграционное окружение для публикации: кластер Kafka, топики, манифесты Kubernetes и Istio для сетевых взаимодействий, конфигурации Secret Management System для получения и ротации сертификатов и другое.
Развернуть DataSpace и приложение, протестировать интеграцию.
Провести нагрузочное тестирование, по результатам уточнить параметры масштабирования сервиса.
Выпустить релиз приложения совместно с DataSpace.
Пример передачи в "Каталог продуктов" сведений об изменении клиента по продукту при слиянии/разъединении#
Входное состояние: приложение развернуто с DataSpace, настроено слияние/разъединение Master Data Management.
Порядок действий:
Создать модель с событием слияния. model.xml:
<model name="rko-product" version="0.1"> <external-entity name="ClientEPK" merge-kind="Organization"/> <class name="RKOProduct"> <property name="code" type="String" length="32"/> <property name="curISO" type="String" length="3"/> <property name="contractNumber" type="String" length="64"/> <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние"/> </class> <!-- добавляем в модель событие слияния/разъединения client в RKOProduct --> <event name="RKOMergeEvent" merge-event="true"> <property name="rko" type="RKOProduct" parent="true"/> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="mergeRKOForPPK" name="MergeRKOForPPK" description="Событие о слиянии клиента продукта для каталога продуктов" target="KAFKA" eventType="RKOMergeEvent" callback="PPK_KAFKA:${rko.merge.subscription.1.topic}" validTill="9999-12-31T23:59:59.999Z" async="false" blocking="true"> <query> { searchRKOProduct(cond: "it.$id==''${rko}''") { elems { id code contractNumber curISO client } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId11111 ", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "RKOMergeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId01", "rko": "1231415534646745", "aggregateRootId": "1231415534646745", "mergeKind": "Organization", "referenceName": "client", "fromReference": "1999449494944077", "toReference": "1999449494944942" }, "data": { "searchRKOProduct": { "elems": { "id": "1231415534646745", "code": "RKO", "contractNumber": "123141553464", "curISO": "RUB", "client": "1999449494944942" } } } } --> <!-- Ожидаемый результат --> <!-- { "Contract": { "ProductCode": "RKO", "ContractID": "1231415534646745", "epkOrgId": "1999449494944942", "ContractNumber": "123141553464", "CurrencyIso": "RUB" } } --> <template> [ { "operation": "shift", "spec": { "event": { "rko": "Contract.ContractID", "toReference": "Contract.epkOrgId" }, "data": { "searchRKOProduct": { "elems": { "code": "Contract.ProductCode", "contractNumber": "Contract.ContractNumber", "curISO": "Contract.CurrencyIso" } } } } } ] </template> <headers> -XTenantId=${rko.merge.subscription.1.tenantId} </headers> </subscription> </subscriptions>Настроить подключение к Kafka для публикации событий.
Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- rko.merge.subscription.1.topic={{ mergeRKOForPPK.topic.value }} rko.merge.subscription.1.tenantId={{ mergeRKOForPPK.tenantId.value }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.bootstrap-servers={{ mergeRKOForPPK.kafka.bootstrap.servers }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.acks={{ mergeRKOForPPK.kafka.acks }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.delivery-timeout-ms={{ mergeRKOForPPK.kafka.delivery.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.request-timeout-ms={{ mergeRKOForPPK.kafka.request.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.retry-backoff-ms={{ mergeRKOForPPK.kafka.retry.backoff.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.linger-ms={{ mergeRKOForPPK.kafka.linger.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.enable-idempotence={{ mergeRKOForPPK.kafka.enable.idempotence }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.max-in-flight-requests-per-connection={{ mergeRKOForPPK.kafka.max.in.flight.requests.per.connection }}custom_property.conf.yml:
mergeRKOForPPK.topic.value=topic-name mergeRKOForPPK.tenantId.value=tenantId mergeRKOForPPK.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka> mergeRKOForPPK.kafka.acks=all mergeRKOForPPK.kafka.delivery.timeout.ms=5000 mergeRKOForPPK.kafka.request.timeout.ms=200 mergeRKOForPPK.kafka.retry.backoff.ms=500 mergeRKOForPPK.kafka.linger.ms=0 mergeRKOForPPK.kafka.enable.idempotence=true mergeRKOForPPK.kafka.max.in.flight.requests.per.connection=5 # ...Собрать и развернуть приложение.
Выполнить слияние.
Пример уведомления о состоянии обработки заявки#
Порядок действий:
Создать модель с событием изменения статуса заявки. model.xml:
<model name="rko-product" version="0.2"> <external-entity name="ClientEPK" merge-kind="Individual"/> <enum name="ApplicationStatus"> <value name="DRAFT"/> <value name="SUBMITTED"/> <value name="APPROVED"/> <value name="REJECTED"/> <value name="CANCELLED"/> </enum> <class name="Application"> <property name="code" type="String" length="32"/> <property name="name" type="String" length="254"/> <property name="ref" type="String" length="64" unique="true"/> <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние/разъединение"/> <property name="content" type="Text"/> <property name="applicationStatus" type="String" length="32"/> </class> <!-- добавляем в модель событие изменения статуса заявки --> <event name="StatusChangeEvent"> <property name="application" type="Application" parent="true" /> <property name="reason" type="String" length="2048" description="Причина изменения статуса"/> <property name="eventUser" type="String" length="254" description="Идентификатор пользователя, действия которого привели к изменению статуса" /> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="applicationStatusNotify" name="applicationStatusNotify" description="Событие об изменении статуса заявки" target="REST" eventType="StatusChangeEvent" callback="${baseWebHookUrl}/api/v1/statusNotify" validTill="9999-12-31T23:59:59.999Z" maxRetryAttempts="${applications.subscription.1.nretry}" timeoutMs="${applications.subscription.1.timeout}" retryDelayMs="${applications.subscription.1.retrydelay}" async="true" blocking="true"> <query> { searchApplication(cond: "it.$id==''${application}''") { elems { id code name ref applicationStatus } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId555 ", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "StatusChangeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId02", "application": "1231415534646745", "aggregateRootId": "1231415534646745", "reason": "Application successfully passed all checks and has been approved", "eventUser": "xxx-uuuuuuu-zzz" }, "data": { "searchApplication": { "elems": { "id": "1231415534646745", "code": "APP-X333", "name": "Application for grant", "ref": "APP-X333-113333332", "applicationStatus": "APPROVED" } } } } --> <!-- Ожидаемый результат --> <!-- { "ApplicationReference": "APP-X333-113333332", "ApplicationCode": "APP-X333", "ApplicationName": "Application for grant", "Status": "APPROVED", "Reason": "Application successfully passed all checks and has been approved", "Timestamp": "2023-04-01T22:22:22.001Z" } --> <template> [ { "operation": "shift", "spec": { "event": { "reason": "Reason" "creationTimeStamp": "Timestamp" }, "data": { "searchApplication": { "elems": { "code": "ApplicationCode", "name": "ApplicationName", "ref": "ApplicationReference", "applicationStatus": "Status" } } } } } ] </template> <headers> -XTenantId=${applications.subscription.1.tenantId} -XchangeUser=${eventUser} </headers> </subscription> </subscriptions>Сконфигурировать подключение к REST API для публикации.
Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- baseWebHookUrl={{ applicationStatusNotify.baseWebHookUrl.value }} applications.subscription.1.nretry=1 applications.subscription.1.timeout=10000 applications.subscription.1.retrydelay={{ applicationStatusNotify.retrydelay.value }} applications.subscription.1.tenantId={{ applicationStatusNotify.tenantId.value }}custom_property.conf.yml:
applicationStatusNotify.baseWebHookUrl.value=http://localhost:8082 applicationStatusNotify.retrydelay.value=2000 applicationStatusNotify.tenantId.value=tenantId # ...Собрать DataSpace.
Задать стендозависимые параметры.
Развернуть приложение.
Вызвать
/packetAPI DataSpace с командойcreateдля Event и командой установкиapplicationStatusв "APPROVED".{ "jsonrpc": "2.0", "method": "execute", "id": 1, "params": { "packet": { "commands": [ { "id": "updateApplication", "name": "update", "params": { "type": "Application", "id": "<идентификатор_заявки>", "applicationStatus": "APPROVED" } }, { "id": "createStatusChangeEvent", "name": "create", "params": { "type": "StatusChangeEvent", "application": "<идентификатор_заявки>", "reason": "заявка обработана", "eventUser": "<идентификатор_пользователя>" } } ] } } }