Руководство по работе с сервисом DataSpace Subscription#
О документе#
В составе продукта Platform V DataSpace (APT) поставляется сервис DataSpace Subscription, реализующий универсальный (повторно используемый разными потребителями) механизм, позволяющий DataSpace участвовать в событийном обмене (выступать в роли провайдера событий).
В данном документе приведено описание настройки сервиса DataSpace Subscription и работы с ним.
Расшифровку основных понятий см. в документе "Термины и определения".
Описание механизма Subscription#
События#
Предусмотрена возможность формирования событий (Event) по видам изменений:
Создание событий в пакете команд и mutation GraphQL.
Проведение операции слияния/разъединения для заданного класса.
Состав полей события и его привязку к другим объектам определяет потребитель DataSpace в модели своей предметной области при помощи объектов вида Event.
Подписки и публикация событий#
Предоставлена возможность создания подписки на события. В подписке задаются:
тип события;
способ публикации — через topic Kafka или через webhook (вызов REST API);
спецификация преобразования данных события в тело сообщения;
запрос (query), который должен быть выполнен при соответствующем событии для получения дополнительных данных для публикации;
дополнительный фильтр по данным события, если фильтр не вернул данные, то публикация не осуществляется;
таймаут и количество retry при публикации через webhook;
возможность определить дополнительные заголовки (headers), содержимое которых может быть задано явно или параметризовано данными события (Event).
Тело сообщения для публикации представляет собой JSON, формируемый по заданному в подписке преобразованию в JSON данных Event и полей из результирующего query, по умолчанию для преобразования JSON в JSON используется библиотека JOLT.
События слияния/разъединения#
Если сконфигурировать подписку на события слияния/разъединения, то при выполнении слияния/разъединения сформируется событие слияния/разъединения, которое будет опубликовано в соответствии с подпиской.
Для этого в модели предметной области необходимо объявить Event с признаком merge-event="true", связанный с классом, содержащим референс, обрабатываемый при слиянии/разъединении.
Такие Event не доступны для создания через пакет команд или mutation GraphQL. В качестве значения тенанта (атрибут ownerId) для таких событий устанавливается значение, равное значению тенанта того агрегата, в рамках которого произошли изменения.
Если процесс слияния/разъединения завершился успешно, то формируется Event со значением поля mergeFailed = false.
Предусмотрена возможность создания события в том случае, если процесс слияния/разъединения завершился с ошибкой, которая требует постороннего вмешательства.
За это отвечает настройка сервиса дедубликации duplication.service.enable-failed-merge-events. По умолчанию формирование таких событий выключено: duplication.service.enable-failed-merge-events=false.
Если перевести настройку duplication.service.enable-failed-merge-events сервиса дедубликации в значение true, то при возникновении ошибки процесса слияния/разъединения будут создаваться события со значением поля mergeFailed = true.
При этом также заполняется поле errorCode — код ошибки и errorMessage — сообщение об ошибке.
Виды ошибок, при которых формируется Event со значением поля mergeFailed = true предопределены.
Доступен вид ошибок UNIQ — ошибка нарушения уникальности.
Для подписки на неуспешные события слияния/разъединения необходимо использовать фильтр root.mergeFailed == true в поле criteria для подписки.
Примечание
Если функциональность обработки событий слияния/разъединения клиентов первый раз используется потребителем, начиная с версии компонентов DataSpace 1.11.0 и выше, и если настройка сервиса дедубликации
duplication.service.enable-failed-merge-eventsустановлена в значениеtrue, то для подписки на успешные события слияния/разъединения достаточно использовать фильтрroot.mergeFailed == falseв полеcriteriaдля подписки.Eсли данная функциональность использовалась до версии 1.11.0, и если настройка сервиса дедубликации
duplication.service.enable-failed-merge-eventsустановлена в значениеtrue, то следует использовать фильтрcoalesce(root.mergeFailed, false) != trueв полеcriteriaдля подписки.
Диаграммы событийного обмена#
Требования к модели#
В модели данных необходимо описать события (event), которые будут обрабатываться при работе сервиса DataSpace Subscription.
Событие может выступать как в роли самостоятельного агрегата, так и являться частью одного из существующих агрегатов в модели, для этого у события должна быть объявлена ссылка на родительский класс (parent="true"). Событие, у которого отсутствует родительская ссылка, является самостоятельным агрегатом.
Отличия Event от класса:
для него определена только операция создания;
служит только для подписки на него;
не наследуется и имеет плоскую структуру;
допустима ссылка на родительский класс для формирования агрегатной связи.
Event, у которого отсутствует родительская ссылка, является самостоятельным агрегатом;обработанный всеми подписками
Eventавтоматически удаляется по истечении времени;не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.
Пример описания события, являющегося самостоятельным агрегатом:
<event name="MyEvent">
<property name="message" type="String"/>
</event>
Пример события, которое входит в агрегат Product:
<model>
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
</class>
<event name="ProductEvent">
<property name="message" type="String"/>
<property name="product" type="Product" parent="true"/>
</event>
</model>
Отдельным видом событий являются события слияния/разъединения. В модели они обозначаются признаком merge-event="true". Для
событий слияния/разъединения обязательно наличие ссылки на родительский класс:
<model>
<external-types>
<external-type type="Client" merge-kind="Organization"/>
</external-types>
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<property name="oneToOneLink" type="One2OneClassLink" mappedBy="product" label="o2o связь"/>
<property name="services" type="Service" collection="set" mappedBy="product" label="Коллекция сервисов"/>
</class>
<class name="Service" label="Сервис" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<property name="product" type="Product" label="продукт" parent="true" index="true"/>
</class>
<class name="One2OneClassLink" label="Класс для связи o2o">
<property name="name" type="String" label="Наименование"/>
<property name="product" type="ProductParty" parent="true"/>
</class>
<event name="ProductClientMergeEvent" merge-event="true">
<reference name="client" type="Client" label="Идентификатор клиента"/>
<property name="product" type="Product" parent="true"/>
</event>
</model>
Для свойств событий допустимо указание тегов: name, type, parent, label, length, precision, mandatory, default-value.
События слияния/разъединения создаются автоматически сервисом DataSpace ReferenceUpdater.
Спецификация подписок#
Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml. При сборке проекта файл попадает в конфигурацию DataSpace Subscription. При старте сервис DataSpace Subscription читает этот файл и загружает подписки в таблицу T_DSPC_SYS_SUBSCRIPTION.
Пример:
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="mergeSubscription1"
name="EPKMergeForProduct"
description="Публикация события с слиянии во внешнюю систему"
target="REST"
eventType="ProductClientMergeEvent"
callback="${my.merge.subscription.1.url}/api/v1/products"
validTill="20240101T00:00:00Z"
maxRetryAttempts="${my.merge.subscription.1.nretry}"
timeoutMs="${my.merge.subscription.1.timeout}"
retryDelayMs="${my.merge.subscription.1.retrydelay}"
async="false"
blocking="true"
idempotenceHeaderName="requestUID">
<criteria>
root.referenceName=='client' && coalesce(root.mergeFailed, false) != true
</criteria>
<query>
{
searchProduct(cond: "it.$id=='${product}'") {
elems {
code
name
services {
elems {
code
name
}
}
oneToOneLink {
name
}
}
}
}
</query>
<template>
[
{
"operation": "shift",
"spec": {
"data": {
"searchProduct": {
"elems": {
"*": {
"code": "ProductCodeMappedTo",
"oneToOneLink": {
"name": "oneToOneLinkNameMappedTo"
},
"services": {
"elems": {
"*": {
"code": "servicesMappedTo"
}
}
}
}
}
}
}
}
}
]
</template>
<headers>
-XTenantId=${my.merge.subscription.1.tenantId}
</headers>
</subscription>
</subscriptions>
Описание полей:
id— уникальный идентификатор подписки, используется в качестве primary key в таблице подписокT_DSPC_SYS_SUBSCRIPTION.Внимание!
Значение "0" зарезервировано системой.
name— наименование подписки.description— текстовое описание подписки.target— способ публикации (REST,KAFKA,GRAPHQLSUBS).eventType— тип событий (entity name), на которые осуществляется подписка.callback— URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka.validTill— время окончания действия подписки.maxRetryAttempts— количество повторных попыток публикации (актуально только для target == REST).timeoutMs— тайм-аут публикации, в миллисекундах (актуально только для target == REST).retryDelayMs— задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST).async— использование асинхронной отправки (true/false).blocking— приостановление отправки при первой ошибке (true/false).idempotenceHeaderName— наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности. При всех попытках отправки одного и того же сообщения заголовок принимает одинаковое значение. Если атрибут не задан или значение пустое — заголовок не формируется.criteria— строковое выражение для фильтрации записей события. С описанием формата строковых выражений можно ознакомиться в документе "Строковые выражения".query— текст GraphQL-запроса для чтения дополнительных данных. С GraphQL-API можно ознакомиться в документе "Протокол GraphQL". Текст запроса может содержать placeholder формата ${<наименование_свойства_события>}. Перед выполнением GraphQL-запроса, placeholder будут заменены на соответствующие значения свойств события. <наименование_свойства_события> — наименование свойства JPA-класса события entity,nameкоторого прописан в полеeventTypeу конкретного subscription в subscriptions.xml.template— спецификация трансформации JSON-данных в JSON-сообщение. Для трансформации по умолчанию используется библиотека JOLT, поэтому для описания преобразования в параметре template используется формат спецификации этой библиотеки. Исходные данные для трансформации по шаблону имеют общий вид:{ "event": { <атрибуты события> }, "data": { <результат выполнения graphql-запроса. Если graphql-запрос в подписке не задан или не вернул данных — элемент пустой> } }headers— спецификация дополнительных заголовков сообщения. Формат: <наименование_заголовка>=<значение>. <значение> может содержать Placeholder формата ${<наименование_свойства_события>}, которые будут заменены аналогично логике, описанной для поляquery.
Примечание
На старте сервиса DataSpace Subscription проводится валидация полей
criteria,query,template. Если валидация не проходит, сервис завершает свою работу.
Внимание!
Поля
criteria,query,templateиheadersне могут содержать пустое значение. Если не требуется задавать значение для данных полей, то не нужно указывать соответствующие теги.
Стендозависимая конфигурация subscriptions.xml#
Потребитель может использовать в значениях полей target, callback, template, headers, maxRetryAttempts, timeoutMs, retryDelayMs Placeholder, которые заполняться при загрузке модуля из конфигурации приложения. Конфигурация приложения может быть параметризована и сделана стендозависимой.
Для этого необходимо добавить в файл customer-properties.subscription.yaml список параметров со значениями. В качестве значения может быть использован также параметр развертывания, например, {{tenantId}}. Эти параметры должны быть перечислены также в файле custom_property.conf.yml, могут быть как со значениями по умолчанию, так и без значений.
При установке на стенд этим параметрам должны быть заданы значения в стендозависимой конфигурации.
Подробнее см. в документе "Руководство по установке" в следующих разделах:
"Добавление пользовательских параметров в дистрибутив".
"Использование пользовательских параметров в рамках сервиса DataSpace".
Пример customer-properties.subscription.yaml:
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
my.merge.subscription.1.url={{ customer.value1 }}
my.merge.subscription.1.nretry=1
my.merge.subscription.1.timeout={{ customer.value2 }}
my.merge.subscription.1.retrydelay=2000
my.merge.subscription.1.tenantId={{ customer.value3 }}
Пример custom_property.conf.yml:
customer.value1=http://localhost:8082
customer.value2=10000
customer.value3=tenantId
# ...
Гарантия последовательности публикации событий
Для гарантии последовательной отправки событий по агрегату необходимо для подписки:
выключить async-режим:
async="false";включить blocking-режим:
blocking="true".
Комбинацию async="true" blocking="true" использовать не следует, т.к. последовательность обработки в этом случае не гарантируется.
Процессы и масштабирование#
Основную логику сервиса DataSpace Subscription реализуют процессы EVENT_TRANSFER и EVENT_WORKER.
Задача процесса EVENT_TRANSFER состоит в том, чтобы для каждого события, по которому есть валидные подписки, сформировать записи в таблице очереди на публикацию T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего событие считается обработанным. При этом для всех выбранных новых Event (для которых нашлись валидные подписки и были созданы записи в T_DSPC_SYS_EVENT_QUEUE_ITEM, а также для которых подписок не нашлось) статус меняется на PROCESSED.
Процесс EVENT_WORKER обрабатывает таблицу очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM: читает соответствующее событие с применением условия criteria подписки, выполняет запрос дополнительных данных, используя query, трансформирует данные в итоговое сообщение по шаблону template и осуществляет публикацию сообщения на основании target и callback.
Предусмотрена возможность масштабирования процессов EVENT_TRANSFER и EVENT_WORKER. За количество процессов, которые будут запущены, отвечают настройки dataspace.subscription.event-transfer.transfer-processes-count и dataspace.subscription.event-worker.worker-processes-count соответственно. Реализована возможность распределения процессов по партициям для того, чтобы процессы обрабатывали каждый свой набор данных.
За количество партиций для процессов EVENT_TRANSFERи EVENT_WORKER отвечают настройки dataspace.subscription.event-transfer.partitions-number и dataspace.subscription.event-worker.partitions-number соответственно. Партиции ведутся в разрезе подписок. Для процессов EVENT_TRANSFER зарезервирован идентификатор подписки со значением "0".
Внимание!
Настройки количества партиций для процессов
EVENT_TRANSFERиEVENT_WORKERдолжны иметь одинаковые значение для всех экземпляров сервиса DataSpace Subscription, работающих с одной БД. При рассогласовании значений данных настроек между разными экземплярами процессы будут брать в работу один и тот же набор данных, что приведет к нарушению порядка обработки событий.Также запрещено менять данные настройки в runtime.
Служебная таблица T_DSPC_SYS_SUBSCRIPTION_WORKER содержит информацию о партициях в разрезе подписок. Недостающие партиции добавляет служебный процесс на старте сервиса. Процессы EVENT_TRANSFER и EVENT_WORKER в процессе своей работы проверяют таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER на предмет необрабатываемых партиций и берут их в работу, прописывая свой идентификатор в поле PROCESSID. Партиция считается необрабатываемой, если PROCESSID не установлен, либо ее heartbeat не обновлялся определенное время, которое задается в настройках dataspace.subscription.event-transfer.heartbeat-timeout-sec и dataspace.subscription.event-worker.heartbeat-timeout-sec для процессов EVENT_TRANSFER и EVENT_WORKER соответственно.
Набор данных разбивается по партициям при помощи предварительно рассчитанного хеша, который хранится в поле PARTITIONHASH в таблицах
событий и очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM. При обработке записей процессы EVENT_TRANSFER и EVENT_WORKER вычисляют партицию, к которой принадлежит запись по формуле PARTITIONHASH mod <количество_партиций>. Если полученная партиция присутствует среди партиций, назначенных процессу, то он выполняет обработку записи.
Таблица T_DSPC_SYS_SUBSCRIPTION_PROCESS также способствует управлению масштабированием процессов. При старте сервиса процессы регистрируются в данной таблице, записывая свой идентификатор в поле OBJECT_ID. В процессе работы процессы пересчитывают значение поля MAXPARTITIONS, которое содержит максимальное количество партиций, обрабатываемых процессом, по следующей формуле: <количество_подписок> * <количество_партиций> / <количество_процессов>. Если процессу назначено больше партиций, чем MAXPARTITIONS, то он освободит нужное количество.
Поскольку EVENT_WORKER может обрабатывать последовательно партиции разных подписок, то при сбое одного из получателей может возникнуть задержка обработки для других подписок.
Для того чтобы работоспособные получатели продолжали получать сообщения с неизменной скоростью, реализован специальный тип процессов — ERROR_WORKER.
Алгоритм действия сервиса при возникновении ошибки по одной из партиций подписки:
Все партиции подписки, при обработке которой произошла ошибка,
EVENT_WORKERпереключает в состояниеstate = ERRORв таблицеT_DSPC_SYS_SUBSCRIPTION_WORKERи перестает их обрабатывать, установив значениеprocessId = null.Процесс
ERROR_WORKERвыбирает для обработки партиции с состояниемERRORилиCIRCUIT_BREAKING, устанавливая свойprocessIdдля соответствующих партиций.Процесс
ERROR_WORKERобрабатывает полученные партиции по алгоритму, аналогичномуEVENT_WORKER.Если очередное событие было успешно обработано, то
ERROR_WORKERдолжен переключить партицию в состояниеstate = UNASSIGNEDи установитьprocessId = nullдля того, чтобы ее снова подхватилEVENT_WORKER.
При этом за время восстановления в очереди на отправку могло накопиться значительное число событий, что также может повлиять на скорость отправки по тем подпискам, по которым ошибки отсутствуют.
Поэтому перед переключением партиции ERROR_WORKER анализирует размер очереди по партиции и, если он выше порогового значения (dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold), то ERROR_WORKER продолжает обрабатывать партицию, пока размер очереди не станет ниже порогового значения.
Для процессов ERROR_WORKER также предусмотрена возможность масштабирования:
За количество процессов, которые будут запущены, отвечает настройка
dataspace.subscription.event-worker.error-worker-processes-count.Количество партиций для процессов
ERROR_WORKERравно количеству партиций, установленному для процессовEVENT_WORKER(dataspace.subscription.event-worker.partitions-number).
Служебные таблицы#
T_DSPC_SYS_SUBSCRIPTION
Таблица подписок. Данные загружаются из файла subscriptions.xml при старте сервиса.
Поле |
Описание |
|---|---|
OBJECT_ID |
идентификатор подписки, задает пользователь (значение "0" зарезервировано системой) |
NAME |
имя подписки (для отображения, включая labels мониторинга) |
DESCRIPTION |
описание подписки |
EVENTTYPE |
entity name событий, на которые осуществляется подписка |
CRITERIA |
строковое выражение для фильтрации записей события |
QUERY |
текст GraphQL-запроса для чтения дополнительных данных |
TEMPLATE |
спецификация трансформации JSON-данных в JSON-сообщение |
TARGET |
способ публикации (REST, KAFKA, GRAPHQLSUBS) |
CALLBACK |
URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka |
HEADERS |
спецификация дополнительных заголовков сообщения |
IDEMPOTENCEHEADERNAME |
наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности |
ASYNC |
признак использования асинхронной отправки (true/false) |
BLOCKING |
признак приостановки отправки при первой ошибке (true/false) |
MAXRETRYATTEMPTS |
количество повторных попыток публикации (актуально только для target == REST) |
RETRYDELAYMS |
задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST) |
TIMEOUTMS |
тайм-аут публикации, в миллисекундах (актуально только для target == REST) |
VALIDTILL |
время окончания действия подписки |
SYS_OWNERID |
владелец подписки (ID тенанта) |
SYS_LASTCHANGEDATE |
дата последнего изменения записи |
T_DSPC_SYS_EVENT_QUEUE_ITEM
Таблица очереди обработки событий. Заполняется процессами EVENT_TRANSFER. Обрабатывается процессами EVENT_WORKER.
Поле |
Описание |
|---|---|
OBJECTID_EVENTID |
ссылка на событие (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
ссылка на подписку (часть первичного ключа) |
EVENTTYPE |
entity name события |
STATUS |
статус обработки элемента очереди событий (NEW, SENT, ERROR, SKIP) |
AGGREGATEROOTTYPE |
тип корня агрегата Event, определяется по метаданным |
AGGREGATEROOTID |
идентификатор корня агрегата события |
PARTITIONHASH |
предварительно рассчитанный хеш для деления по партициям |
IDEMPOTENCEHEADER |
UUID-значение, формируемое при создании публикации и передаваемое в специальном заголовке при публикации сообщения |
CREATIONTIMESTAMP |
время добавления события в очередь |
SYS_LASTCHANGEDATE |
дата последнего изменения записи |
Значения поля STATUS:
NEW— новый элемент очереди событий, еще не был обработан;SENT— элемент очереди событий успешно обработан, сообщение отправлено;ERROR— произошла ошибка при обработке, элемент будет обработан повторно;SKIP— подходящее событие не найдено в процессе обработки элемента очереди событий (скорее всего событие не прошло по условию, указанному в поле criteria подписки).
T_DSPC_SYS_SUBSCRIPTION_WORKER
Поле |
Описание |
|---|---|
OBJECTID_PARTITIONNUMBER |
номер партиции (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
ссылка на подписку (часть первичного ключа) |
HEARTBEAT |
время последнего обновления |
PROCESSID |
идентификатор процесса |
STATE |
состояние обработки партиции (ACTIVE, CIRCUIT_BREAKING, UNASSIGNED, ERROR) |
T_DSPC_SYS_SUBSCRIPTION_PROCESS
Поле |
Описание |
|---|---|
OBJECT_ID |
идентификатор процесса (первичный ключ) |
MAXPARTITIONS |
максимальное количество партиций, обрабатываемых процессом |
KIND |
вид процесса (worker/перекладчик) (EVENT_WORKER, EVENT_TRANSFER) |
HEARTBEAT |
время последнего обновления |
Конфигурация#
dataspace.subscription.idempotenceHeaderUuidWithHyphens
#Значение по умолчанию: "true"
#true — UUID для заголовка идемпотентности idempotenceHeaderName формируется с дефисами, в результате его размер равен 36 символам
#false — UUID для заголовка идемпотентности idempotenceHeaderName формируется без дефисов, в результате его размер равен 32 символам
dataspace.subscription.event-transfer.transfer-processes-count
#Значение по умолчанию: 1
#Количество процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы перекладчика
dataspace.subscription.event-transfer.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс перекладчика
dataspace.subscription.event-transfer.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс перекладчика будет считаться неактивным
dataspace.subscription.event-transfer.max-transfer-count
#Значение по умолчанию: 1000
#Количество событий, обрабатываемых перекладчиком за один цикл работы
dataspace.subscription.event-worker.worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER
dataspace.subscription.event-worker.error-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER, обрабатывающего ошибки (ERROR_WORKER)
dataspace.subscription.event-worker.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы процесса EVENT_WORKER
dataspace.subscription.event-worker.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов EVENT_WORKER
dataspace.subscription.event-worker.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс EVENT_WORKER
dataspace.subscription.event-worker.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс EVENT_WORKER будет считаться неактивным
dataspace.subscription.event-worker.circuit-breaker-error-count-threshold
#Значение по умолчанию: 10
#Количество ошибок по подписке, по достижении которого включается circuit breaker на эту подписку
dataspace.subscription.event-worker.circuit-breaker-timeout-ms
#Значение по умолчанию: 30000 миллисекунд
#Время с момента включения circuit breaker на подписку, по истечении которого circuit breaker будет выключен
dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold
#Значение по умолчанию: 100
#Пороговое значение размера очереди накопившихся сообщений по подписке
#Пока размер очереди выше указанного значения порога, `ERROR_WORKER` не будет переключать партицию обратно на `EVENT_WORKER`
dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker
#Значение по умолчанию: 5
#Максимальное количество потоков (на процесс) в пуле потоков, используемом для асинхронной отправки
#Результирующее максимальное количество потоков равно:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)
dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker
#Значение по умолчанию: 1000
#Максимальное количество элементов в очереди (на процесс) пула потоков, используемом для асинхронной отправки
#Результирующий размер очереди равен:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)
dataspace.subscription.service-tasks.pool-size
#Значение по умолчанию: 5
#Количество потоков в пуле служебных процессов
dataspace.subscription.service-tasks.delete-dead-processes-schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который удаляет неактивные процессы EVENT_TRANSFER и
#EVENT_WORKER из таблицы T_DSPC_SYS_SUBSCRIPTION_PROCESS
dataspace.subscription.service-tasks.load-subscriptions-schedule-delay-ms
#Значение по умолчанию: 1000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который выполняет загрузку подписок из файла subscriptions.xml
#в таблицу T_DSPC_SYS_SUBSCRIPTION, а также вставку недостающих партиций в таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER
#После успешной загрузки процесс перестает запускаться
dataspace.subscription.service-tasks.cleanup-depth-hr
#Значение по умолчанию: 48 часов
#Максимальное время хранения записей в таблицах событий и таблице очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего
#записи будут удалены фоновым процессом, если они уже были обработаны: status == PROCESSED для событий
#и status == SENT для записей T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.cleanup-cron
#Значение по умолчанию: 0 0 0 * * * (каждый день в 00:00)
#Расписание запуска служебного процесса, который удаляет отработанные старые записи из таблиц событий и
#таблицы очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.tasks-schedule-initial-delay-ms
#Значение по умолчанию: 90000 миллисекунд
#Задержка перед запуском всех фоновых процессов на старте сервиса. Применяется для обеспечения корректной установки health-метрики
#readiness на старте сервиса.
dataspace.subscription.xml.enable-check-on-start
#Значение по умолчанию: true
#Включает/выключает валидацию полей query, criteria, template у подписок, описанных в subscription.xml при старте сервиса
#Только для стендов разработки! При эксплуатации в промышленных условиях должна быть установлена в значение "true"
Если предполагается использование функциональности публикации событий в Kafka, то необходимо задать настройки для каждого кластера Kafka в файле customer-properties.subscription.yaml в следующем виде:
Примечание
В случае использовании Helm-инсталляции в рамках раздела "Ручная установка сервиса" документа "Руководство по установке" необходимо создать
kind: Secretв среде контейнеризации с параметрами, указанными ниже, и задать имя полученного секрета в параметр в Values.yml:appConfig: subscription: config: "cm-dspc-subscriptionstub"
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.bootstrap-servers=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.acks=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.delivery-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.request-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.retry-backoff-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.linger-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotence=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.max-in-flight-requests-per-connection=
#Если требуется SSL:
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-key-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-endpoint-identification-algorithm=
При необходимости параметры можно сделать стендозависимыми по аналогии со стендозависимой конфигурацией subscriptions.xml (см. раздел "Стендозависимая конфигурация subscriptions.xml").
Пример customer-properties.subscription.yaml:
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.bootstrap-servers={{ local.kafka.bootstrap.servers }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.acks={{ local.kafka.acks }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.delivery-timeout-ms={{ local.kafka.delivery.timeout.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.request-timeout-ms=200
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.retry-backoff-ms=500
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.linger-ms={{ local.kafka.linger.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.enable-idempotence={{ local.kafka.enable.idempotence }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.max-in-flight-requests-per-connection={{ local.kafka.max.in.flight.requests.per.connection }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-key-password=
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-location=/deployments/credentials/kafkaSslKeystore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-password=12456
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-location=/deployments/credentials/kafkaSslTrustwtore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-password=54321
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-endpoint-identification-algorithm=
Пример custom_property.conf.yml:
local.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
local.kafka.acks=all
local.kafka.delivery.timeout.ms=5000
local.kafka.linger.ms=0
local.kafka.enable.idempotence=true
local.kafka.max.in.flight.requests.per.connection=5
# ...
Внимание!
При использовании SSL установка значения "true" для параметра
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotenceможет приводить к ошибкеorg.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. Для того чтобы этого избежать, необходимо корректным образом настроить операциюIdempotentWriteдля кластера. Подробнее см. в официальной документации продукта Kafka.
В файле subscriptions.xml target необходимо задавать в формате <наименование_кластера>:<наименование_topic>.
Пример: LOCAL_KAFKA:test-topic.
Retry#
Параметры retry/timeout для Kafka задаются в настройках Kafka на конкретный кластер <наименование_кластера>. Для этого необходимо использовать параметры delivery-timeout-ms, request-timeout-ms, retry-backoff-ms.
Параметры retry/timeout для REST берутся из параметров подписки: maxRetryAttempts, timeoutMs, retryDelayMs. Для ошибок с http status code 4xx retry не выполняется (то есть retry — только для 5xx и в случае тайм-аута).
Для идемпотентности потребитель может определить необходимый заголовок (передавая какое-либо из полей event с уникальным значением).
Circuit breaking#
На уровне параметра подписки blocking вводится настройка режима обработки ошибки:
blocking=true— при первой ошибке запрет на отправку остальных публикаций по партиции подписки.blocking=false— разрешена публикация других событий по подписке до наступления условия приостановки.
Поведение при ошибках:
Производится фиксация ошибки. Статус "ERROR" устанавливается для элемента очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM.
Если параметр
blocking=false, то продолжается обработка следующих сообщений, при этом счетчик ошибок по подписке увеличивается.Если параметр
blocking=true, то полностью приостанавливается отправка по этой партиции, при этом счетчик ошибок по подписке увеличивается.При накоплении порогового значения ошибок по подписке (настройка
dataspace.subscription.event-worker.circuit-breaker-error-count-thresholdравна, например, "10") приостанавливается отправка по этой подписке. При этом включается circuit breaker на всю подписку на один экземпляр приложения.Спустя время, указанное в настройке
dataspace.subscription.event-worker.circuit-breaker-timeout-ms, производится попытка повторной отправки, начиная с тех, чья отправка завершилась с ошибкой.Успешная отправка сбрасывает счетчик ошибок.
Примечание
Каждый процесс EVENT_WORKER самостоятельно принимает решение о приостановке и возобновлении обработки партиции. Для того чтобы не останавливать весь поток при ошибке в данных одной из публикаций приостановится только одна партиция.
Мониторинг#
Мониторинг включается настройкой:
dataspace.subscription.metrics.enabled
#"false" — метрики выключены (значение по умолчанию)
#"true" — метрики включены
Реализованы следующие метрики мониторинга:
Наименование метрики |
Тип метрики |
Теги |
Описание |
|---|---|---|---|
dspc.subscription.handle.time |
Timer |
|
Время обработки событий в разрезе подписки и результата отправки |
dspc.subscription.publication.time |
Timer |
|
Время отправки сообщений в разрезе подписки и результата отправки |
dspc.subscription.errors.count |
Counter |
|
Количество ошибок отправки событий в разрезе подписки |
dspc.subscription.skip.count |
Counter |
|
Количество отсеянных событий в разрезе подписки |
dspc.subscription.transfer.count |
Counter |
Скорость перекладки событий |
|
dspc.subscription.partition.state |
Gauge |
|
Статус обработки партиций — количество по T_DSPC_SYS_SUBSCRIPTION_WORKER в разрезе state и subscriptionId |
dspc.subscription.event.queue.size |
Gauge |
|
Количество элементов очереди обработки событий в таблице T_DSPC_SYS_EVENT_QUEUE_ITEM в разрезе подписок и статусов. |
dspc.subscription.new.events.count |
Gauge |
|
Отображает количество событий со статусом |
Метрики dspc.subscription.partition.state, dspc.subscription.event.queue.size и dspc.subscription.new.events.count обновляются фоновым scheduled-процессом. Частота обновления
данных метрик регулируется настройкой:
dataspace.subscription.metrics.schedule-delay-ms
#Значение по умолчанию — 10000 миллисекунд
API повторной отправки событий#
REST API POST /admin/api/subscriptions/<id>/resend позволяет повторно отправить сообщения для событий, которые были уже обработаны ранее.
Внимание!
Переотправка в topic Kafka допустима только при единственной системе, читающей topic, по инициативе этой системы. Все остальные читатели могут получить ранее обработанные ими данные, в результате чего могут образоваться дубли.
Примечание
Переотправка возможна только для тех событий, по которым присутствуют записи в таблицах событий и очереди публикации (которые не были очищены). Необходимо задавать глубину очистки очередей событий с учетом требований по их возможной переотправке.
Примечание
При наличии в подписке GraphQL-query при повторной отправке он будет выполнен повторно на текущих данных. Как следствие — данные публикации могут отличаться от предыдущего вызова. Если для каких-то данных это критично, то необходимо завести соответствующие атрибуты в самом событии и заполнять их при создании события.
Параметры:
id— идентификатор подписки;тело запроса формата json:
{ "filter" : "root.fromReference=='2' && root.referenceName=='client'", "timeRange" : { "from" : "2023-11-29T10:39:46.587", "to" : "2023-12-01T10:39:46.587" } }filter— фильтр по атрибутам события, написанный на языке строковых выражений;timeRange.from,timeRange.to— временной диапазон (учитывается полеcreationTimestampсущности события) для выбора событий, сообщения по которым будут отправлены повторно.
Требования к запросу:
Один из параметров
filterилиtimeRangeобязательно должен быть заполнен, иначе API вернет ошибку405 Method Not Allowedс сообщением "Не задано ни одного условия отбора событий".Если задан параметр
timeRange, тоtoиfromобязательны к заполнению. Еслиtoилиfromне заполнены, то API вернет ошибку400 Bad Requestс сообщением "Поле timeRange#from обязательно для заполнения" или "Поле timeRange#to обязательно для заполнения".
При успешном вызове API возвращает код 200 OK и количество событий, по которым были повторно отправлены сообщения. Если не было найдено не одного события, подходящего под условие выборки, сформированного на основе переданных параметров, то API вернет ошибку 404 Not Found c сообщением "Не найдено подходящих событий для переотправки".
Пример вызова (на основе subscriptions.xml, описанного выше):
curl -d @data.json -H "Content-Type: application/json" -X POST http://localhost:8090/admin/api/subscriptions/mergeSubscription1/resend
data.json:
{
"filter" : "root.$id $in ['7294634218387406849','7294634226977341441'] && root.fromReference=='2' && root.referenceName=='client'",
"timeRange" :
{
"from" : "2023-11-29T10:39:46.587",
"to" : "2023-12-01T10:39:46.587"
}
}
Пример ответа:
5
Внимание!
Если параметры подписки в subscriptions.xml были изменены после первичной публикации события, то при повторной отправке:
сообщение будет направлено на новый url/топик, если были изменены поля
targetи/илиcallback;формат сообщения изменится, если было изменено поле
template;данные сообщения изменятся, если было изменено поле
query;сообщение не будет отправлено, если было изменено поле
criteriaи событие теперь не подходит под новое условие;сообщение будет содержать новый состав заголовков, если было изменено поле
headers.Если на момент первичной публикации события для подписки не был задан атрибут
idempotenceHeaderName, то и при повторной отправке соответствующий заголовок передан не будет, даже если к тому моменту атрибутidempotenceHeaderNameбудет добавлен в подписку.
Клиентский путь#
Порядок действий:
Определить список событий для публикации.
Добавить в проекте DataSpace в модель данных model.xml соответствующие этим событиям Event с необходимым для публикации набором атрибутов.
Создать в проекте DataSpace конфигурацию подписок subscriptions.xml, предусмотрев необходимые параметры для стендозависимой параметризации.
В конфигурации подписок описать шаблон трансформации данных события (и результата выполнения
query) в JSON для публикации.Сформировать файлы
customer-properties.subscription.yamlиcustom_property.conf.yml, тем самым определив стендозависимую конфигурацию.Выполнить сборку проекта DataSpace.
Доработать приложение для создания событий в необходимых узлах бизнес-процесса (вместе с изменением бизнес-данных или независимо).
Подготовить интеграционное окружение для публикации: кластер Kafka, topics, манифесты Kubernetes и Istio для сетевых взаимодействий, конфигурации Secret Management System для получения и ротации сертификатов и другое.
Развернуть DataSpace и приложение, протестировать интеграцию.
Провести нагрузочное тестирование, по результатам уточнить параметры масштабирования сервиса.
Выпустить релиз приложения совместно с DataSpace.
Пример передачи в "Каталог продуктов" сведений об изменении клиента по продукту при слиянии/разъединении#
Входное состояние: приложение развернуто с DataSpace, настроено слияние/разъединение Master Data Management.
Порядок действий:
Создать модель с событием слияния. model.xml:
<model name="rko-product" version="0.1"> <external-types> <external-type type="ClientEPK" merge-kind="Organization"/> </external-types> <class name="RKOProduct"> <property name="curISO" type="String" length="3"/> <property name="contractNumber" type="String" length="64"/> <reference name="client" type="ClientEPK" description="По этой ссылке выполняется слияние"/> </class> <!-- добавляем в модель событие слияния/разъединения client в RKOProduct --> <event name="RKOMergeEvent" merge-event="true"> <property name="rko" type="RKOProduct" parent="true"/> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="mergeRKOForPPK" name="MergeRKOForPPK" description="Событие о слиянии клиента продукта для каталога продуктов" target="KAFKA" eventType="RKOMergeEvent" callback="PPK_KAFKA:${rko.merge.subscription.1.topic}" validTill="9999-12-31T23:59:59.999Z" async="false" blocking="true"> <criteria> coalesce(root.mergeFailed, false) != true </criteria> <query> { searchRKOProduct(cond: "it.$id==''${rko}''") { elems { id contractNumber curISO client { entityId } } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId11111 ", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "RKOMergeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId01", "rko": "1231415534646745", "aggregateRootId": "1231415534646745", "mergeKind": "Organization", "referenceName": "client", "fromReference": "1999449494944077", "toReference": "1999449494944942" }, "data": { "searchRKOProduct": { "elems": { "id": "1231415534646745", "contractNumber": "123141553464", "curISO": "RUB", "client": "1999449494944942" } } } } --> <!-- Ожидаемый результат --> <!-- { "Contract": { "ProductCode": "RKO", "ContractID": "1231415534646745", "epkOrgId": "1999449494944942", "ContractNumber": "123141553464", "CurrencyIso": "RUB" } } В примере преобразования в качестве ProductCode подставляется константа "RKO" --> <template> [ { "operation": "shift", "spec": { "event": { "rko": "Contract.ContractID", "toReference": "Contract.epkOrgId" }, "data": { "searchRKOProduct": { "elems": { "*": { "contractNumber": "Contract.ContractNumber", "curISO": "Contract.CurrencyIso" } } } } } }, { "operation": "default", "spec": { "Contract": { "ProductCode": "RKO" } } } ] </template> <headers> -XTenantId=${rko.merge.subscription.1.tenantId} </headers> </subscription> </subscriptions>Настроить подключение к Kafka для публикации событий.
Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- rko.merge.subscription.1.topic={{ mergeRKOForPPK.topic.value }} rko.merge.subscription.1.tenantId={{ mergeRKOForPPK.tenantId.value }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.bootstrap-servers={{ mergeRKOForPPK.kafka.bootstrap.servers }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.acks={{ mergeRKOForPPK.kafka.acks }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.delivery-timeout-ms={{ mergeRKOForPPK.kafka.delivery.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.request-timeout-ms={{ mergeRKOForPPK.kafka.request.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.retry-backoff-ms={{ mergeRKOForPPK.kafka.retry.backoff.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.linger-ms={{ mergeRKOForPPK.kafka.linger.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.enable-idempotence={{ mergeRKOForPPK.kafka.enable.idempotence }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.max-in-flight-requests-per-connection={{ mergeRKOForPPK.kafka.max.in.flight.requests.per.connection }}custom_property.conf.yml:
mergeRKOForPPK.topic.value=topic-name mergeRKOForPPK.tenantId.value=tenantId mergeRKOForPPK.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka> mergeRKOForPPK.kafka.acks=all mergeRKOForPPK.kafka.delivery.timeout.ms=5000 mergeRKOForPPK.kafka.request.timeout.ms=200 mergeRKOForPPK.kafka.retry.backoff.ms=500 mergeRKOForPPK.kafka.linger.ms=0 mergeRKOForPPK.kafka.enable.idempotence=true mergeRKOForPPK.kafka.max.in.flight.requests.per.connection=5 # ...Собрать и развернуть приложение.
Выполнить слияние.
Пример уведомления о состоянии обработки заявки#
Порядок действий:
Создать модель с событием изменения статуса заявки. model.xml:
<model name="rko-product" version="0.2"> <external-types> <external-type type="ClientEPK" merge-kind="Individual"/> </external-types> <enum name="ApplicationStatus"> <value name="DRAFT"/> <value name="SUBMITTED"/> <value name="APPROVED"/> <value name="REJECTED"/> <value name="CANCELLED"/> </enum> <class name="Application"> <property name="code" type="String" length="32"/> <property name="name" type="String" length="254"/> <property name="ref" type="String" length="64" unique="true"/> <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние/разъединение"/> <property name="content" type="Text"/> <property name="applicationStatus" type="String" length="32"/> </class> <!-- добавляем в модель событие изменения статуса заявки --> <event name="StatusChangeEvent"> <property name="application" type="Application" parent="true" /> <property name="reason" type="String" length="2048" description="Причина изменения статуса"/> <property name="eventUser" type="String" length="254" description="Идентификатор пользователя, действия которого привели к изменению статуса" /> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="applicationStatusNotify" name="applicationStatusNotify" description="Событие об изменении статуса заявки" target="REST" eventType="StatusChangeEvent" callback="${baseWebHookUrl}/api/v1/statusNotify" validTill="9999-12-31T23:59:59.999Z" maxRetryAttempts="${applications.subscription.1.nretry}" timeoutMs="${applications.subscription.1.timeout}" retryDelayMs="${applications.subscription.1.retrydelay}" async="true" blocking="true"> <query> { searchApplication(cond: "it.$id=='${application}'") { elems { id code name ref applicationStatus } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId555", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "StatusChangeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId02", "application": "1231415534646745", "aggregateRootId": "1231415534646745", "reason": "Application successfully passed all checks and has been approved", "eventUser": "xxx-uuuuuuu-zzz" }, "data": { "searchApplication": { "elems": { "id": "1231415534646745", "code": "APP-X333", "name": "Application for grant", "ref": "APP-X333-113333332", "applicationStatus": "APPROVED" } } } } --> <!-- Ожидаемый результат --> <!-- { "ApplicationReference": "APP-X333-113333332", "ApplicationCode": "APP-X333", "ApplicationName": "Application for grant", "Status": "APPROVED", "Reason": "Application successfully passed all checks and has been approved", "Timestamp": "2023-04-01T22:22:22.001Z" } --> <template> [ { "operation": "shift", "spec": { "event": { "reason": "Reason" "creationTimeStamp": "Timestamp" }, "data": { "searchApplication": { "elems": { "*": { "code": "ApplicationCode", "name": "ApplicationName", "ref": "ApplicationReference", "applicationStatus": "Status" } } } } } } ] </template> <headers> -XTenantId=${applications.subscription.1.tenantId} -XchangeUser=${eventUser} </headers> </subscription> </subscriptions>Сконфигурировать подключение к REST API для публикации.
Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- baseWebHookUrl={{ applicationStatusNotify.baseWebHookUrl.value }} applications.subscription.1.nretry=1 applications.subscription.1.timeout=10000 applications.subscription.1.retrydelay={{ applicationStatusNotify.retrydelay.value }} applications.subscription.1.tenantId={{ applicationStatusNotify.tenantId.value }}custom_property.conf.yml:
applicationStatusNotify.baseWebHookUrl.value=http://localhost:8082 applicationStatusNotify.retrydelay.value=2000 applicationStatusNotify.tenantId.value=tenantId # ...Собрать DataSpace.
Задать стендозависимые параметры.
Развернуть приложение.
Вызвать
/packetAPI DataSpace с командойcreateдля Event и командой установкиapplicationStatusв "APPROVED".{ "jsonrpc": "2.0", "method": "execute", "id": 1, "params": { "packet": { "commands": [ { "id": "updateApplication", "name": "update", "params": { "type": "Application", "id": "<идентификатор_заявки>", "applicationStatus": "APPROVED" } }, { "id": "createStatusChangeEvent", "name": "create", "params": { "type": "StatusChangeEvent", "application": "<идентификатор_заявки>", "reason": "заявка обработана", "eventUser": "<идентификатор_пользователя>" } } ] } } }