Руководство по работе с сервисом DataSpace Subscription#
О документе#
В составе продукта Platform V DataSpace (APT) поставляется сервис DataSpace Subscription, реализующий универсальный (повторно используемый разными потребителями) механизм, позволяющий DataSpace участвовать в событийном обмене (выступать в роли провайдера событий).
В данном документе приведено описание настройки сервиса DataSpace Subscription и работы с ним.
Расшифровку основных понятий см. в перечне «Термины и определения».
Описание механизма Subscription#
События#
Предусмотрена возможность формирования событий (event) по видам изменений:
Явное создание объекта события в пакете команд и mutation GraphQL.
Проведение операции слияния/разъединения для заданного класса.
Изменение заданного набора свойств сущности средствами DataSpace.
Создание и удаление, а также изменение любого или заданного набора свойств сущности средствами DataSpace.
Создание записи в истории для историцируемой сущности при ее изменении средствами DataSpace.
Состав полей события и его привязку к другим объектам определяет потребитель DataSpace в модели своей предметной области при помощи элементов event.
Подписки и публикация событий#
Для публикации событий в другие системы создаются подписки на события. В подписке задаются следующие параметры:
тип события;
способ публикации — через topic Kafka или через webhook (вызов REST API);
спецификация преобразования данных события в тело сообщения;
GraphQL-запрос (
query), который должен быть выполнен при публикации соответствующего события для получения дополнительных данных с целью публикации;дополнительный фильтр по данным события (если фильтр не вернул данные, то публикация не осуществляется);
тайм-аут и количество retry при публикации через webhook;
возможность определения дополнительных заголовков (
headers), содержимое которых может быть задано явно или параметризовано данными события (event).
Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml (см. раздел «Спецификация подписок»).
Тело сообщения для публикации представляет собой JSON, формируемый по заданному в подписке преобразованию из JSON-данных event и полей из результирующего query. По умолчанию для преобразования из JSON в JSON используется библиотека JOLT.
Примечание
При необходимости можно воспользоваться демо-сайтом JOLT трансформации.
Помимо публикации в другие системы можно подписаться на события с использованием GraphQL subscription API. Описание приведено в разделе «Подписка» руководства «Протокол GraphQL».
Порядок подключения функциональности subscription с примерами приведен в разделе «Клиентский путь».
Диаграмма событийного обмена#
Виды событий#
Существует два основных вида событий: формируемые явно (далее — явные события) и формируемые автоматически (далее — автоматические события).
Явные события#
Явные события используются с целью создания в транзакции произвольных событий с собственными наборами данных для их последующей публикации.
Событие необходимо создавать явно в составе пакета команд.
Данный вид событий объявляется в модели данных при помощи элемента event.
Для события в модели данных объявляется набор его свойств, которым при создании задаются значения.
Автоматические события#
Автоматические события создаются DataSpace неявно при выполнении определенных действий над данными, которые зависят от типа события.
Чтобы DataSpace создал автоматическое событие, его необходимо объявить в модели данных в качестве дочернего события к сущности, действия над которой требуется отслеживать. Такое событие может быть унаследовано от родительского события одного из базовых типов:
BaseMergeEvent, BaseChangeEvent, BaseObjectEvent, BaseTrackingEvent, BaseSnapshotEvent, BaseHistoryEvent. Базовый тип определяет условия для формирования события:
Родительский тип события |
Действие, приводящее к формированию события |
Решаемая задача |
Пример |
Состав данных события |
|---|---|---|---|---|
|
Изменение данных при обработке события слияния/разъединения |
Информирование о произведенных фоновым процессом DataSpace изменениях в данных |
При дедубликации клиентов необходимо производить перепривязку продуктов в CRМ-системе. В модели данных для сущности |
Метка времени изменения, версия объекта, ссылка на родительскую сущность, имя модифицированного поля, его новое и предыдущее значения |
|
Создание записи в истории сущности |
Публикация истории изменений сущности без дублирования данных истории в самом событии |
Для сущности |
Метка времени изменения, версия объекта, ссылка на родительскую сущность |
|
Изменение заданных полей объекта |
Мониторинг изменений полей |
На экране отображаются котировки акций. Необходимо их обновлять в реальном времени. В модели данных к сущности |
Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта |
|
Создание и удаление объекта, изменение любого поля |
Репликация объекта |
Поисковый движок используется для полнотекстового и векторного поиска объектов, ведущихся в DataSpace. Необходимо поддерживать в нем актуальное состояние объектов. В модели данных к сущности создается дочерний |
Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete |
|
Создание и удаление объекта, изменение заданных полей |
Аудит изменения избранных полей |
В составе объекта есть несколько значимых атрибутов, все изменения по которым надо отслеживать. Например, остаток по счету. Создается дочерний |
Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete, идентификатор пользователя, значения заданных полей сущности |
|
Создание и удаление объекта, изменение любого поля |
Аудит изменения объекта. Снапшоты версий объекта. Undo/Redo |
Требуется организовать аудит изменения данных. Для интересующих сущностей создаются дочерние события |
Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete, идентификатор пользователя, значения всех полей сущности (кроме коллекций) |
Для отсылки к полям родительской сущности используется специальный элемент модели — parents-property.
Для отслеживания изменения полей родительской сущности в событиях, унаследованных от BaseChangeEvent, эти поля объявляют при помощи parents-property.
В событиях, унаследованных от BaseTrackingEvent, для этих полей будут созданы собственные поля события, в которых будут сохранены значения по завершении транзакции.
В событиях, унаследованных от BaseSnapshotEvent, будут созданы собственные поля для всех неколлекционных полей родительской сущности, в которых будет сохранено состояние объекта по завершении транзакции.
В событиях, унаследованных от BaseTrackingEvent и BaseSnapshotEvent, при помощи parents-property с атрибутом rename можно задать дополнительные поля, значения которых фиксируются при создании события, например, поля справочников по ссылке.
При удалении объекта в событиях, унаследованных от BaseTrackingEvent и BaseSnapshotEvent, сохраняются последние значения полей перед удалением.
Схема типов событий#
Любое событие является наследником типа BaseEvent, который определяет базовый набор атрибутов события.
Представленная ниже диаграмма типов событий иллюстрирует взаимосвязи между типами событий и их базовыми типами.
Требования к модели данных#
В модели данных необходимо описать события (event), которые будут обрабатываться при работе сервиса DataSpace Subscription.
Явные события в модели данных#
Явное событие может выступать как в роли самостоятельного агрегата, так и являться дочерним объектом одного из существующих агрегатов в модели — для этого у события должна быть объявлена ссылка на родительский класс (размеченная атрибутом parent=»true»). Событие, у которого отсутствует родительская ссылка, является самостоятельным агрегатом.
Данный вид событий необходимо явно создавать в составе пакета команд.
Отличия явного еvent от класса:
для него определены только операции создания и поиска;
служит только для подписки на него;
не наследуется;
имеет плоскую структуру;
допустима ссылка на родительский класс для формирования агрегатной связи;
event, у которого отсутствует родительская ссылка, является самостоятельным агрегатом;обработанный всеми подписками
eventавтоматически удаляется по истечении времени;не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.
Пример описания события, являющегося самостоятельным агрегатом:
<model model-name="MyEventDemo" version="DEV-SNAPSHOT">
<event name="MyEvent">
<property name="message" type="String"/>
</event>
</model>
Пример события, которое входит в агрегат Product:
<model model-name="ProductEventDemo" version="DEV-SNAPSHOT">
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
</class>
<event name="ProductEvent">
<property name="message" type="String"/>
<property name="product" type="Product" parent="true"/>
</event>
</model>
Для свойств событий допустимыми атрибутами являются: name, type, parent, label, length, scale, mandatory, default-value.
Внимание!
При выполнении поисковых операций следует учитывать, что обработанные события по прошествии заданного времени удаляются из БД.
BaseEvent#
Любое событие наследует от абстрактного класса BaseEvent, имеющего следующие свойства:
Свойства BaseEvent:
Свойство |
Тип |
Описание |
|---|---|---|
objectId |
String (254) |
ID события |
creationTimestamp |
OffsetDateTime |
Метка времени создания события |
type |
String (254) |
Тип события (имя класса события). |
status |
enum |
Статус обработки события: |
lastChangeDate |
Date |
Время последнего изменения записи |
aggregateRootId |
String (254) |
ID корня агрегата |
ownerId |
String (254) |
ID тенанта |
partitionHash |
Long |
Предрассчитанный хеш для деления по партициям. |
Автоматические события в модели данных#
Автоматические события, в отличие от явных, не нужно создавать в пакете команд. Они создаются самим модулем DataSpace в ответ на некоторые действия, произошедшие в процессе работы модуля.
Отличия автоматического еvent от класса:
для него определена только операции поиска;
служит только для подписки на него;
не может иметь вручную объявленные свойства;
обязательна ссылка на родительский класс для формирования агрегатной связи;
обработанный всеми подписками
eventавтоматически удаляется по истечении времени;должно наследоваться от предопределенных базовых типов автоматических событий (подробно будут рассмотрены далее):
BaseChangeEvent,BaseObjectEvent,BaseTrackingEvent,BaseSnapshotEvent,BaseMergeEvent,BaseHistoryEvent;
не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.
События слияния/разъединения (BaseMergeEvent)#
Отдельным видом событий изменения являются события слияния/разъединения.
Если сконфигурировать подписку на события слияния/разъединения, то при выполнении операции слияния/разъединения сформируется событие слияния/разъединения, которое будет опубликовано в соответствии с подпиской.
Для этого в модели предметной области необходимо объявить event, унаследованный от BaseMergeEvent при помощи атрибут extends со значением BaseMergeEvent.
Этот event необходимо связать с классом, содержащим референс, обрабатываемый при слиянии/разъединении.
Внимание!
Способ объявления при помощи признака
merge-event="true"является устаревшим, пользоваться им не следует.
Такие event не доступны для создания через пакет команд или mutation GraphQL. В качестве значения тенанта (атрибут ownerId) для таких событий устанавливается значение, равное значению тенанта того агрегата, в рамках которого произошли изменения.
Для событий слияния/разъединения обязательно наличие ссылки на родительский класс, но других собственных свойств для них задать нельзя.
Пример объявления:
<model model-name="MergeEventDemo" version="DEV-SNAPSHOT">
<external-types>
<external-type type="Client" merge-kind="Organization"/>
</external-types>
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<reference name="client" type="Client" label="Идентификатор клиента"/>
</class>
<event name="ProductClientMergeEvent" extends="BaseMergeEvent">
<property name="product" type="Product" parent="true"/>
</event>
</model>
Собственные свойства BaseMergeEvent:
Свойство |
Тип |
Описание |
|---|---|---|
mergeKind |
String (254) |
Вид слияния — параметр процесса слияния/разъединения |
referenceName |
String (254) |
Имя обрабатываемой при слиянии ссылки |
fromReference |
String (254) |
Предыдущее значение ссылки |
toReference |
String (254) |
Новое значение ссылки |
mergeFailed |
Boolean |
Признак успешного завершения процесса слияния/разъединения («true» — если процесс завершился неудачей, «false» или «null» — если успешно) |
errorCode |
String (5) |
Код ошибки |
errorMessage |
String (254) |
Сообщение об ошибке |
События слияния/разъединения создаются автоматически сервисом DataSpace ReferenceUpdater.
Если процесс слияния/разъединения завершился успешно, то формируется event со значением поля mergeFailed = false.
Предусмотрена возможность создания события в том случае, если процесс слияния/разъединения завершился с ошибкой, требующей постороннего вмешательства.
За это отвечает настройка сервиса дедубликации duplication.service.enable-failed-merge-events. По умолчанию формирование таких событий выключено: duplication.service.enable-failed-merge-events=false.
Если перевести настройку duplication.service.enable-failed-merge-events сервиса дедубликации в значение true, то при возникновении ошибки процесса слияния/разъединения будут создаваться события со значением поля mergeFailed = true.
При этом также заполняются поля errorCode — код ошибки и errorMessage — сообщение об ошибке.
Виды ошибок, при которых формируется event со значением поля mergeFailed = true, предопределены.
Доступен вид ошибок UNIQ — ошибка нарушения уникальности.
Для подписки на неуспешные события слияния/разъединения необходимо использовать фильтр root.mergeFailed == true в поле criteria для подписки.
Примечание
Если функциональность обработки событий слияния/разъединения клиентов первый раз используется потребителем, начиная с версии компонентов DataSpace 1.11.0 и выше, и если настройка сервиса дедубликации
duplication.service.enable-failed-merge-eventsустановлена в значениеtrue, то для подписки на успешные события слияния/разъединения достаточно использовать фильтрroot.mergeFailed == falseв полеcriteriaдля подписки.Если данная функциональность использовалась до версии 1.11.0, а также если настройка сервиса дедубликации
duplication.service.enable-failed-merge-eventsустановлена в значениеtrue, то следует использовать фильтрcoalesce(root.mergeFailed, false) != trueв полеcriteriaдля подписки.
Примечание
События изменений также будут формироваться при работе сервиса дедубликации, если они отслеживают изменения дедублицируемой ссылки.
Диаграмма событийного обмена при слиянии/разъединении#
События создания записи историцирования (BaseHistoryEvent)#
Данный тип событий применим только к историцируемым сущностям (см. Историцирование).
Для них обязательно наличие ссылки на родительский класс, но других собственных свойств для них задать нельзя.
Событие формируется при создании записи в истории родительской сущности. Для родительского типа можно объявить только один event, унаследованный от BaseHistoryEvent. Пример объявления:
<model model-name="HistoryEventDemo" version="DEV-SNAPSHOT">
<class name="Account">
<property name="number" type="String" length="32" unique="true"/>
<property name="accountType" type="String" length="4" />
<property name="currency" type="String" length="3" historical="true" />
<property name="amount" type="Decimal" length="20" scale="2" historical="true"/>
</class>
<!-- создаем дочерний AccountHistoryEvent -->
<event name="AccountHistoryEvent" extends="BaseHistoryEvent">
<property name="account" type="Account" parent="true"/> <!-- Примечание: Тип `Account` историцируемый, т.к. есть историцируемые поля -->
</event>
</model>
События отслеживания изменений (BaseChangeEvent, BaseObjectEvent, BaseTrackingEvent, BaseSnapshotEvent)#
События отслеживания изменений формируются в ответ на изменения отслеживаемых классов.
Таким образом дается возможность подписаться на изменение сущностей.
Для этого в модели предметной области необходимо объявить event, указав в качестве типа parent свойства тот класс, изменения которого необходимо отслеживать и формировать события.
А также необходимо указать в атрибуте extends один из базовых, предопределенных классов событий изменения сущностей:
BaseChangeEvent;BaseObjectEvent;BaseTrackingEvent;BaseSnapshotEvent.
Для каждого из них предусмотрена своя логика формирования событий, описанная в соответствующих подразделах ниже.
Некоторые виды автоматических событий имеют только базовый набор свойств события, тогда как другие могут сохранять частичное или полное состояние сущности, получаемое в результате изменения.
В BaseChangeEvent и BaseTrackingEvent имеется возможность указать, изменения каких именно свойств необходимо отслеживать, при этом событие формируется при изменении любого из отслеживаемых свойств.
Для этого в спецификации события event используется специальный элемент parents-property.
BaseObjectEvent, BaseTrackingEvent и BaseSnapshotEvent отслеживают не только изменения, но и создание и удаление сущности.
Могут отслеживаться свойства следующих типов:
примитивные типы;
перечисления;
ссылки на другой класс (отслеживается изменение самой ссылки, а не сущности по ссылке);
reference;
свойства статусной модели (сгенерированные
statusFor<наблюдатель>);ссылки на справочники;
embeddable типы.
Свойства-коллекции указывать в качестве отслеживаемых нельзя.
Особенности обработки транзакции с созданием событий отслеживания изменений#
Для того чтобы формируемые события с сохранением состояния содержали актуальное значение всех сохраняемых свойств
все отслеживаемые сущности блокируются при выполнении пакета (кроме операций создания, т.к. это не имеет смысла).
В качестве альтернативы предусмотрена опция, позволяющая вместо блокировки выполнять повторное чтение обновленных сущностей непосредственно перед формированием события.
Данную опцию можно включить настройкой dataspace-core.events.lock-updated-entity-for-change-event — значение по умолчанию false, т.е. обновленные сущности считываются повторно.
К операциям удаления всегда применяется только блокирование.
При наличии в модели данных пользовательских событий, унаследованных от имеющихся базовых типов классов событий изменения сущности, а также при изменении соответствующих сущностей при выполнении пакета команд, в результате заданные события сформируются автоматически в той же транзакции.
Внимание!
Следует учесть, что событие удаления не будет сформировано, если в результате выполнения пакета команд удаляется корень агрегата, которому принадлежит событие.
Модель данных для иллюстрации примеров работы с событиями отслеживания изменений#
Далее примеры будут базироваться на следующей модели предметной области:
<?xml version="1.0" encoding="UTF-8" ?>
<model model-name="ChangeEventsModel" version="DEV-SNAPSHOT" xmlns="DataspaceModel">
<!-- Внешний тип -->
<external-types>
<external-type type="Client" merge-kind="ORGANIZATION"/>
<external-type type="Document" />
</external-types>
<!-- Корень агрегата -->
<class name="AccountGroup" label="Группа счетов клиента — агрегат">
<reference name="groupClient" type="Client" unique="true"/>
<property name="accounts" type="Account" collection="set" mappedBy="accountGroup"/>
<property name="statementInfo" type="StatementInfo" collection="set" mappedBy="accountGroup"/>
</class>
<!-- Отслеживаемая сущность для демонстрации примеров событий -->
<class name="Account" label="Клиентский счет">
<!-- Родительская ссылка на корень агрегата -->
<property name="accountGroup" type="AccountGroup" parent="true"/>
<!-- Свойство примитивного типа -->
<property name="accountType" type="String" length="4" />
<!-- Свойство примитивного типа -->
<property name="number" type="String" length="32" unique="true"/>
<!-- Embeddable свойство -->
<property name="balance" type="Balance" historical="true"/>
<!-- Ссылка на дочернюю сущность -->
<property name="statementInfo" type="StatementInfo" />
<!-- Ссылка на справочник -->
<property name="branch" type="Branch" />
<!-- Референс на внешнюю сущность -->
<reference name="client" type="Client"/>
<!-- Свойство примитивного типа Text-->
<property name="description" type="Text" />
<!-- Свойство примитивного типа Binary-->
<property name="hash" type="Binary" />
<!-- Свойство с типом enum, имя которого совпадает с именем служебного атрибута события -->
<property name="status" type="StatusEnum" mandatory="true"/>
<!-- Референс на другой агрегат -->
<reference name="product" type="Product" />
<!-- Коллекция дочерних элементов -->
<property name="postings" type="Posting" collection="set" mappedBy="account"/>
<!-- Коллекция примитивного типа -->
<property name="tags" type="String" collection="set" />
<!-- Коллекция внешних референсов -->
<reference name="documents" type="Document" collection="set"/>
<!-- <property name="statusForAccounting" type="Status"/> будет создан при генерации DataSpace на основании статусной модели -->
</class>
<!-- Enum -->
<enum name="StatusEnum">
<value name="ACTIVE"/>
<value name="INACTIVE"/>
</enum>
<!-- Embeddable -->
<class name="Balance" embeddable="true">
<property name="value" type="Decimal" length="20" scale="2"/>
<property name="currency" type="String" length="3"/>
</class>
<!-- Справочник -->
<class name="Branch" is-dictionary="true">
<property name="code" type="String" />
<property name="name" type="String" />
<property name="location" type="String"/>
</class>
<!-- Дочерний класс -->
<class name="Posting" label="Движение по счету">
<property name="account" type="Account" parent="true"/>
<property name="amount" type="Decimal" />
<property name="description" type="String" length="255"/>
</class>
<!-- Еще один дочерний класс -->
<class name="StatementInfo" label="Данные для выписки">
<property name="accountGroup" type="AccountGroup" parent="true"/>
<property name="periodicity" type="Integer" />
<property name="title" type="String" />
<property name="name" type="String" />
<property name="deliveryAddress" type="Text" />
</class>
<!-- Статусная модель -->
<status-classes class="Account">
<stakeholder name="Accounting" code="accounting" />
</status-classes>
<statuses class="Account">
<stakeholder-link code="accounting">
<status code="open" description="счет открыт без ограничений" name="открыт" initial="true">
<to status="frozen"/>
<to status="closed"/>
</status>
<status code="frozen" name="заблокирован">
<to status="open"/>
<to status="closed"/>
</status>
<status code="closed" name="закрыт"/>
</stakeholder-link>
</statuses>
<!-- Отдельный агрегат -->
<class name="Product" label="Продукт" lockable="true">
<property name="name" type="String" label="Наименование"/>
<property name="code" type="String" label="Код"/>
<reference name="client" type="Client" label="Идентификатор клиента"/>
</class>
<event name="AccountChangeEvent" extends="BaseChangeEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="statusForAccounting"/>
<parents-property name="client"/>
</event>
<event name="AccountObjectEvent" extends="BaseObjectEvent">
<property name="account" type="Account" parent="true"/>
</event>
<event name="AccountTrackingEvent" extends="BaseTrackingEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="statusForAccounting.code"/> <!-- Примечание: сохраняется code статуса, отслеживается изменения идентификатора статуса -->
<parents-property name="client"/>
<parents-property name="statementInfo"/> <!-- Примечание: сохраняется и отслеживается значение ссылки -->
<parents-property name="statementInfo.title"/> <!-- Примечание: дополнительно сохраняется значение свойства title сущности по ссылке statementInfoChildRef (если ссылка не `null`) -->
</event>
<event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true"> <!-- Примечание: у события задан атрибут snapshot-large-properties="true", т.е. в данных события будут сохранены значения свойств description c типом Text и hash с типом Binary-->
<property name="account" type="Account" parent="true"/>
<parents-property name="status" rename="accountStatus"/> <!-- Примечание: требуется явное переименование, поскольку status пересекается с именем служебного свойства события) -->
<parents-property name="statementInfo.title"/> <!-- Примечание: дополнительно сохраняется значение свойства title сущности по ссылке statementInfoChildRef (если ссылка не `null`) -->
</event>
</model>
Элемент parents-property#
Для указания отслеживаемых свойств существует специальный элемент в модели — parents-property со следующими атрибутами:
Имя атрибута |
Назначение |
|---|---|
name |
а) Имя отслеживаемого свойства в указанном родительском классе или |
rename |
Используется в событиях, сохраняющих состояние изменившейся сущности, если при сохранении необходимо переименовать родительское свойство или свойство сущности по ссылке (например, при коллизии имен). |
Различные варианты задания parents-property иллюстрирует пример ниже, в котором создается дочернее событие ParentsPropertiesSample, унаследованное от BaseTrackingEvent. Его элементы parents-property демонстрируют способы объявления свойств родительского класса.
<event name="ParentsPropertiesSample" extends="BaseTrackingEvent">
<property name="account" type="Account" parent="true"/>
<!-- указание на свойство-статус statusForAccounting -->
<parents-property name="statusForAccounting"/>
<!-- указание на примитивное свойство number — в результирующем классе события будет создано примитивное свойство number -->
<parents-property name="number" />
<!-- указание на ссылку на справочник branch — в результирующем классе события будет создано свойство-ссылка на справочник Branch -->
<parents-property name="branch" />
<!-- указание на примитивное свойство code справочника Branch — в результирующем классе события будет создано примитивное свойство branchCode -->
<parents-property name="branch.code" rename="branchCode" />
<!-- указание на свойство-перечисление status, переименование необходимо, поскольку имя совпадает с именем служебного свойства события — в результирующем классе события будет создано примитивное свойство accountStatus -->
<parents-property name="status" rename="accountStatus" />
<!-- указание на свойство-reference product — в результирующем классе события будет создано свойство-reference -->
<parents-property name="product" />
<!-- указание на embeddable-свойство balance целиком — в результирующем классе события свойства будет создано такое же embeddable свойство -->
<parents-property name="balance" />
<!-- указание на примитивное поле value у свойства balance с типом embeddable — в результирующем классе события будет создано примитивное свойство value -->
<parents-property name="balance.value" />
<!-- указание на свойство-ссылку statementInfo — в результирующем классе события будет создано свойство-ссылка -->
<parents-property name="statementInfo" />
<!-- указание на примитивное свойство name у свойства statementInfo — в результирующем классе события будет создано примитивное свойство infoName -->
<parents-property name="statementInfo.name" rename="infoName" />
</event>
BaseChangeEvent#
При объявлении для сущности дочернего события, унаследованного от BaseChangeEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности из списка parents-property (и только в этом случае).
Значения свойств отслеживаемой сущности в данных события не сохраняются, элементы parents-property задаются только для отслеживания изменений.
Собственные свойства BaseChangeEvent:
Свойство |
Тип |
Описание |
|---|---|---|
sysVersion |
Long |
Версия сущности (ее агрегата) по результатам операции |
sysTimeChanged |
OffsetDateTime |
Метка времени изменения, одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата |
Пример объявления BaseChangeEvent события, формируемого при изменении любого из свойств statusForAccounting, product:
<event name="AccountChangeEvent" extends="BaseChangeEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="statusForAccounting"/>
<parents-property name="product"/>
</event>
BaseObjectEvent#
При объявлении для сущности дочернего события, унаследованного от BaseObjectEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности (включая изменение коллекций примитивов и референсов), а также создании и удалении объекта.
Эти события не хранят состояния отслеживаемой сущности. Для данного типа запрещено явное указание элементов parents-property, т.к. это не несет смысла.
Внимание!
Коллекции ссылок с типом связи «один-ко-многим» и mapped-by ссылки с типом связи один-к-одному не отслеживаются ни одним из типов событий.
Внимание!
При использовании данного вида события следует учитывать особенности отслеживания изменений сущностей при выполнении пакета команд Packet (см. раздел «Особенности отслеживания изменений сущностей при выполнении пакета команд Packet» документа «Руководство прикладного разработчика».
В модели данных может быть только одно событие BaseObjectEvent для каждого пользовательского типа.
Собственные свойства BaseObjectEvent:
Свойство |
Тип |
Описание |
|---|---|---|
sysVersion |
Long |
Версия сущности (ее агрегата) по результатам операции |
sysTimeChanged |
OffsetDateTime |
Метка времени изменения. Одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата |
sysObjectEvent |
enum SysObjectEvent |
Тип действия, совершенного над отслеживаемой сущностью: |
Пример объявления BaseObjectEvent события, формируемого при изменении любого из свойств родительского класса Account, кроме postings,
при создании и удалении объектов типа Account.
<event name="AccountObjectEvent" extends="BaseObjectEvent">
<property name="account" type="Account" parent="true"/>
</event>
BaseTrackingEvent#
При объявлении для сущности дочернего события, унаследованного от BaseTrackingEvent, событие указанного типа будет формироваться при изменении любого из перечисленных в parents-property свойств сущности, а также создании и удалении объекта.
Значения указанных при помощи элементов parents-property свойств родительской сущности по состоянию на момент завершения операции сохраняются в копии этих свойств в данных события. При удалении сущности сохраняются значения, которые эти свойства имели непосредственно перед удалением.
Доступна возможность сохранения свойств сущностей по ссылке, а также переименования сохраняемых свойств.
Внимание!
Элементы
parents-propertyтиповBaseTrackingEventне могут указывать на коллекционные свойства.
Собственные свойства BaseTrackingEvent:
Свойство |
Тип |
Описание |
|---|---|---|
sysVersion |
Long |
Версия сущности (ее агрегата) по результатам операции |
sysTimeChanged |
OffsetDateTime |
Метка времени изменения. Одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата |
sysObjectEvent |
enum SysObjectEvent |
Тип действия, совершенного над отслеживаемой сущностью: |
sysChangeUser |
String |
Информация о пользователе, изменившем данные, используется значение из HTTP-заголовка (конфигурирование способа получения информации описано в разделе «Способы получения информации о пользователе» документа «Руководство по системному администрированию») |
При изменении атрибутного состава родительского класса (например, в родительском классе добавили или удалили атрибут, или изменили тип, или изменился признак deprecated) класс события также обновится соответствующим образом.
Пример объявления BaseTrackingEvent события, формируемого при изменении любого из свойств statusForAccounting, description, statementInfo, а также при создании и удалении объектов:
<event name="AccountTrackingEvent" extends="BaseTrackingEvent">
<property name="account" type="Account" parent="true"/>
<!-- Сохраняется code статуса, отслеживается изменения идентификатора статуса -->
<parents-property name="statusForAccounting.code"/>
<parents-property name="description"/>
<!-- Сохраняется и отслеживается значение ссылки -->
<parents-property name="statementInfo"/>
<!-- Дополнительно сохраняется значение свойства сущности по ссылке (если ссылка не `null`) -->
<parents-property name="statementInfo.title"/>
</event>
BaseSnapshotEvent#
При объявлении для сущности дочернего события, унаследованного от BaseSnapshotEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности (включая изменение коллекций примитивов и референсов), а также создании и удалении объекта.
Это событие сохраняет в своих данных состояние всех свойств по состоянию на момент завершения операции, за исключением коллекций и свойств с типами Text и Binary (то есть в классе события будут созданы соответствующие свойства).
При удалении сущности сохраняются значения, которые эти свойства имели непосредственно перед удалением.
Помимо этого, через элементы parents-property доступна возможность сохранения свойств сущностей по ссылке, а также переименования сохраняемых свойств.
Если необходимо сохранять и свойства с типами Text и Binary, то следует явно указать имена нужных свойств при помощи элемента parents-property.
Также имеется возможность сохранять значения всех свойств с типами Text и Binary, задав на элементе event события атрибут snapshot-large-properties="true".
Внимание!
Элементы
parents-propertyтиповBaseSnapshotEventне могут указывать на коллекционные свойства.
Внимание!
Следует учесть, что данные свойств с типами
TextиBinaryмогут занимать значительное место. Поэтому в событии следует сохранять только то, что действительно необходимо фиксировать на момент транзакции (что нельзя, например, получить при помощи GraphQL-query при публикации).
Внимание!
При использовании данного вида события следует учитывать особенности отслеживания изменений сущностей при выполнении пакета команд Packet (см. раздел «Особенности отслеживания изменений сущностей при выполнении пакета команд Packet» документа «Руководство прикладного разработчика».
Состав собственных атрибутов BaseSnapshotEvent совпадает с BaseTrackingEvent.
При изменении атрибутного состава родительского класса (например, в родительском классе добавили или удалили атрибут, или изменили тип, или изменился признак deprecated) класс события также соответствующим образом изменится.
Пример объявления BaseSnapshotEvent события, формируемого при изменении любого из свойств parent класса, кроме postings, а также при создании и удалении объектов:
<event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true"> <!-- Примечание: включен snapshot-large-properties, т.е. сохраняются и свойства `someText`, `someBinary` -->
<property name="account" type="Account" parent="true"/>
<!-- Дополнительно сохраняется значение свойства сущности по ссылке (если ссылка не `null`) -->
<parents-property name="statementInfo.title"/>
<!-- Необходимо явное переименование, поскольку status пересекается с именем служебного свойства события) -->
<parents-property name="status" rename="accountStatus"/>
</event>
Спецификация подписок#
Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml. При сборке проекта файл попадает в конфигурацию DataSpace Subscription. При старте сервис DataSpace Subscription читает этот файл и загружает подписки в таблицу T_DSPC_SYS_SUBSCRIPTION.
Пример:
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="mergeSubscription1"
name="EPKMergeForProduct"
description="Публикация события с слиянии во внешнюю систему"
target="REST"
eventType="ProductClientMergeEvent"
callback="${my.merge.subscription.1.url}/api/v1/products"
validTill="20240101T00:00:00Z"
maxRetryAttempts="${my.merge.subscription.1.nretry}"
timeoutMs="${my.merge.subscription.1.timeout}"
retryDelayMs="${my.merge.subscription.1.retrydelay}"
async="false"
blocking="true"
idempotenceHeaderName="requestUID">
<criteria>
root.referenceName=='client' && coalesce(root.mergeFailed, false) != true
</criteria>
<query>
{
searchProduct(cond: "it.$id=='${product}'") {
elems {
code
name
services {
elems {
code
name
}
}
oneToOneLink {
name
}
}
}
}
</query>
<template>
[
{
"operation": "shift",
"spec": {
"data": {
"searchProduct": {
"elems": {
"*": {
"code": "ProductCodeMappedTo",
"oneToOneLink": {
"name": "oneToOneLinkNameMappedTo"
},
"services": {
"elems": {
"*": {
"code": "servicesMappedTo"
}
}
}
}
}
}
}
}
}
]
</template>
<headers>
-XTenantId=${my.merge.subscription.1.tenantId}
</headers>
</subscription>
</subscriptions>
Описание полей:
id— уникальный идентификатор подписки, используется в качестве primary key в таблице подписокT_DSPC_SYS_SUBSCRIPTION.Внимание!
Значение «0» зарезервировано системой.
name— наименование подписки.description— текстовое описание подписки.target— способ публикации (REST, KAFKA, GRAPHQLSUBS).eventType— тип событий (entity name), на которые осуществляется подписка.callback:REST API: URL для публикации сообщения. Можно указать HTTP-метод (POST, GET, PUT, PATCH, DELETE). HTTP-метод (если он указан) должен быть первым элементом строки, указанной в поле callback. Разрешается любой регистр и любое количество пробелов до и после. Если HTTP-метод не указан, то по умолчанию будет использован POST. Параметры URL могут содержать placeholders формата ${<наименование_свойства_события>}. Перед выполнением вызова placeholders будут заменены на соответствующие значения свойств события. Пример:
callback="GET localhost:8080/api/v1/products/touch?${fromClient}".Kafka: <наименование_кластера>:<наименование_topic>. Пример:
callback="PPK_KAFKA:topicName".
validTill— время окончания действия подписки.maxRetryAttempts— количество повторных попыток публикации (актуально только для target == REST).timeoutMs— тайм-аут публикации, в миллисекундах (актуально только для target == REST).retryDelayMs— задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST).async— использование асинхронной отправки (true/false).blocking— приостановление отправки при первой ошибке (true/false).idempotenceHeaderName— наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности. При всех попытках отправки одного и того же сообщения заголовок принимает одинаковое значение. Если атрибут не задан или значение пустое — заголовок не формируется.criteria— строковое выражение для фильтрации записей события. С описанием формата строковых выражений можно ознакомиться в документе «Строковые выражения».query— текст GraphQL-запроса для чтения дополнительных данных. С GraphQL-API можно ознакомиться в документе «Протокол GraphQL». Текст запроса может содержать placeholder формата ${<наименование_свойства_события>}. Перед выполнением GraphQL-запроса, placeholder будут заменены на соответствующие значения свойств события. <наименование_свойства_события> — наименование свойства JPA-класса события entity,nameкоторого прописан в полеeventTypeу конкретного subscription в subscriptions.xml.template— спецификация трансформации JSON-данных в JSON-сообщение. Для трансформации по умолчанию используется библиотека JOLT, поэтому для описания преобразования в параметре template используется формат спецификации этой библиотеки. Исходные данные для трансформации по шаблону имеют общий вид:{ "event": { <атрибуты события> }, "data": { <результат выполнения graphql-запроса. Если graphql-запрос в подписке не задан или не вернул данных — элемент пустой> } }headers— спецификация дополнительных заголовков сообщения. Формат: <наименование_заголовка>=<значение>. <значение> может содержать Placeholder формата ${<наименование_свойства_события>}, которые будут заменены аналогично логике, описанной для поляquery.
Примечание
На старте сервиса DataSpace Subscription проводится валидация полей
criteria,query,template. Если валидация не проходит, сервис завершает свою работу.
Примечание
В качестве placeholder могут использоваться следующие типы свойств события:
Примитивы, enum ${<наименование_свойства_события>};
Ссылки ${<наименование_ссылки_события>} (подставляется значение id);
Reference ${<наименование_reference_события>} (подставляется значение entityId);
Embeddable ${<наименование_embeddable_события>.<наименование_свойства_embeddable>} (подставляется значение свойства embeddable).
Внимание!
Поля
criteria,query,templateиheadersне могут содержать пустое значение. Если не требуется задавать значение для данных полей, то не нужно указывать соответствующие теги.
Стендозависимая конфигурация subscriptions.xml#
Потребитель может использовать в значениях полей target, callback, template, headers, maxRetryAttempts, timeoutMs, retryDelayMs Placeholder, которые заполнятся при загрузке модуля из конфигурации приложения. Конфигурация приложения может быть параметризована и сделана стендозависимой.
Для этого необходимо добавить в файл customer-properties-subscription.yaml список параметров со значениями. В качестве значения может быть использован также параметр развертывания, например, {{tenantId}}. Эти параметры должны быть перечислены также в файле custom_property.conf.yml, могут быть как со значениями по умолчанию, так и без значений.
При установке на стенд этим параметрам должны быть заданы значения в стендозависимой конфигурации.
Подробнее см. в документе «Руководство по установке» в следующих разделах:
«Добавление пользовательских параметров в дистрибутив».
«Использование пользовательских параметров в рамках сервиса DataSpace».
Пример customer-properties-subscription.yaml:
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
my.merge.subscription.1.url={{ customer.value1 }}
my.merge.subscription.1.nretry=1
my.merge.subscription.1.timeout={{ customer.value2 }}
my.merge.subscription.1.retrydelay=2000
my.merge.subscription.1.tenantId={{ customer.value3 }}
Пример custom_property.conf.yml:
customer.value1=http://localhost:8082
customer.value2=10000
customer.value3=tenantId
# ...
Гарантия последовательности публикации событий
Для гарантии последовательной отправки событий по агрегату необходимо для подписки:
выключить async-режим:
async="false";включить blocking-режим:
blocking="true".
Комбинацию async="true" blocking="true" использовать не следует, т.к. последовательность обработки в этом случае не гарантируется.
Процессы и масштабирование#
Основную логику сервиса DataSpace Subscription реализуют процессы EVENT_TRANSFER и EVENT_WORKER.
Задача процесса EVENT_TRANSFER состоит в том, чтобы для каждого события, по которому есть валидные подписки, сформировать записи в таблице очереди на публикацию T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего событие считается обработанным. При этом для всех выбранных новых event (для которых нашлись валидные подписки и были созданы записи в T_DSPC_SYS_EVENT_QUEUE_ITEM, а также для которых подписок не нашлось) статус меняется на PROCESSED.
Процесс EVENT_WORKER обрабатывает таблицу очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM: читает соответствующее событие с применением условия criteria подписки, выполняет запрос дополнительных данных, используя query, трансформирует данные в итоговое сообщение по шаблону template и осуществляет публикацию сообщения на основании target и callback.
Предусмотрена возможность масштабирования процессов EVENT_TRANSFER и EVENT_WORKER. За количество процессов, которые будут запущены, отвечают настройки dataspace.subscription.event-transfer.transfer-processes-count и dataspace.subscription.event-worker.worker-processes-count соответственно. Реализована возможность распределения процессов по партициям для того, чтобы процессы обрабатывали каждый свой набор данных.
За количество партиций для процессов EVENT_TRANSFERи EVENT_WORKER отвечают настройки dataspace.subscription.event-transfer.partitions-number и dataspace.subscription.event-worker.partitions-number соответственно. Партиции ведутся в разрезе подписок. Для процессов EVENT_TRANSFER зарезервирован идентификатор подписки со значением «0».
Внимание!
Настройки количества партиций для процессов
EVENT_TRANSFERиEVENT_WORKERдолжны иметь одинаковые значение для всех экземпляров сервиса DataSpace Subscription, работающих с одной БД. При рассогласовании значений данных настроек между разными экземплярами процессы будут брать в работу один и тот же набор данных, что приведет к нарушению порядка обработки событий.Также запрещено менять данные настройки в runtime.
Служебная таблица T_DSPC_SYS_SUBSCRIPTION_WORKER содержит информацию о партициях в разрезе подписок. Недостающие партиции добавляет служебный процесс на старте сервиса. Процессы EVENT_TRANSFER и EVENT_WORKER в процессе своей работы проверяют таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER на предмет необрабатываемых партиций и берут их в работу, прописывая свой идентификатор в поле PROCESSID. Партиция считается необрабатываемой, если PROCESSID не установлен, либо ее heartbeat не обновлялся определенное время, которое задается в настройках dataspace.subscription.event-transfer.heartbeat-timeout-sec и dataspace.subscription.event-worker.heartbeat-timeout-sec для процессов EVENT_TRANSFER и EVENT_WORKER соответственно.
Набор данных разбивается по партициям при помощи предварительно рассчитанного хеша, который хранится в поле PARTITIONHASH в таблицах
событий и очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM. При обработке записей процессы EVENT_TRANSFER и EVENT_WORKER вычисляют партицию, к которой принадлежит запись по формуле PARTITIONHASH mod <количество_партиций>. Если полученная партиция присутствует среди партиций, назначенных процессу, то он выполняет обработку записи.
Таблица T_DSPC_SYS_SUBSCRIPTION_PROCESS также способствует управлению масштабированием процессов. При старте сервиса процессы регистрируются в данной таблице, записывая свой идентификатор в поле OBJECT_ID. В процессе работы процессы пересчитывают значение поля MAXPARTITIONS, которое содержит максимальное количество партиций, обрабатываемых процессом, по следующей формуле: <количество_подписок> * <количество_партиций> / <количество_процессов>. Если процессу назначено больше партиций, чем MAXPARTITIONS, то он освободит нужное количество.
Поскольку EVENT_WORKER может обрабатывать последовательно партиции разных подписок, то при сбое одного из получателей может возникнуть задержка обработки для других подписок.
Для того чтобы работоспособные получатели продолжали получать сообщения с неизменной скоростью, реализован специальный тип процессов — ERROR_WORKER.
Алгоритм действия сервиса при возникновении ошибки по одной из партиций подписки:
Все партиции подписки, при обработке которой произошла ошибка,
EVENT_WORKERпереключает в состояниеstate = ERRORв таблицеT_DSPC_SYS_SUBSCRIPTION_WORKERи перестает их обрабатывать, установив значениеprocessId = null.Процесс
ERROR_WORKERвыбирает для обработки партиции с состояниемERRORилиCIRCUIT_BREAKING, устанавливая свойprocessIdдля соответствующих партиций.Процесс
ERROR_WORKERобрабатывает полученные партиции по алгоритму, аналогичномуEVENT_WORKER.Если очередное событие было успешно обработано, то
ERROR_WORKERдолжен переключить партицию в состояниеstate = UNASSIGNEDи установитьprocessId = nullдля того, чтобы ее снова подхватилEVENT_WORKER.
При этом за время восстановления в очереди на отправку могло накопиться значительное число событий, что также может повлиять на скорость отправки по тем подпискам, по которым ошибки отсутствуют.
Поэтому перед переключением партиции ERROR_WORKER анализирует размер очереди по партиции и, если он выше порогового значения (dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold), то ERROR_WORKER продолжает обрабатывать партицию, пока размер очереди не станет ниже порогового значения.
Для процессов ERROR_WORKER также предусмотрена возможность масштабирования:
За количество процессов, которые будут запущены, отвечает настройка
dataspace.subscription.event-worker.error-worker-processes-count.Количество партиций для процессов
ERROR_WORKERравно количеству партиций, установленному для процессовEVENT_WORKER(dataspace.subscription.event-worker.partitions-number).
Интеграция с GraphQL Subscriptions#
Сервис DataSpace Subscription обеспечивает реализацию функциональности GraphQL Subscriptions для сервиса DataSpace Core.
Для настройки данной интеграции должны быть сконфигурированы следующие параметры сервиса DataSpace Subscription:
dataspace.graphql.subscriptions.enabled— включает функциональность, обслуживающую работу GraphQL Subscription для сервиса DataSpace Core (по умолчанию равен «true» — включено);dataspace.subscription.grpcServerPort— задает порт, на котором будет запущен gRPC-сервер для обслуживания запросов GraphQL Subscription (по умолчанию равен «9001»);dataspace.subscription.event-worker.gql-worker-processes-count— количество процессов GQL_EVENT_WORKER, обрабатывающих события (по умолчанию равен «1»).
Служебные таблицы#
T_DSPC_SYS_SUBSCRIPTION — таблица подписок. Данные загружаются из файла subscriptions.xml при старте сервиса.
Содержит следующие поля:
Поле |
Описание |
|---|---|
OBJECT_ID |
Идентификатор подписки, задает пользователь (значение «0» зарезервировано системой) |
NAME |
Имя подписки (для отображения, включая labels мониторинга) |
DESCRIPTION |
Описание подписки |
EVENTTYPE |
Entity name событий, на которые осуществляется подписка |
CRITERIA |
Строковое выражение для фильтрации записей события |
QUERY |
Текст GraphQL-запроса для чтения дополнительных данных |
TEMPLATE |
Спецификация трансформации JSON-данных в JSON-сообщение |
TARGET |
Способ публикации («REST», «KAFKA», «GRAPHQLSUBS») |
CALLBACK |
URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka |
HEADERS |
Спецификация дополнительных заголовков сообщения |
IDEMPOTENCEHEADERNAME |
Наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности |
ASYNC |
Признак использования асинхронной отправки («true»/»false») |
BLOCKING |
Признак приостановки отправки при первой ошибке («true»/»false») |
MAXRETRYATTEMPTS |
Количество повторных попыток публикации (актуально только для TARGET == REST) |
RETRYDELAYMS |
Задержка между повторными попытками публикации, в миллисекундах (актуально только для TARGET == REST) |
TIMEOUTMS |
Тайм-аут публикации, в миллисекундах (актуально только для TARGET == REST) |
VALIDTILL |
Время окончания действия подписки |
SYS_OWNERID |
Владелец подписки (ID тенанта) |
SYS_LASTCHANGEDATE |
Дата последнего изменения записи |
T_DSPC_SYS_EVENT_QUEUE_ITEM — таблица очереди обработки событий. Заполняется процессами EVENT_TRANSFER. Обрабатывается процессами EVENT_WORKER.
Содержит следующие поля:
Поле |
Описание |
|---|---|
OBJECTID_EVENTID |
Ссылка на событие (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
Ссылка на подписку (часть первичного ключа) |
EVENTTYPE |
Entity name события |
STATUS |
Статус обработки элемента очереди событий («NEW», «SENT», «ERROR», «SKIP») |
AGGREGATEROOTTYPE |
Тип корня агрегата |
AGGREGATEROOTID |
Идентификатор корня агрегата события |
PARTITIONHASH |
Предварительно рассчитанный хеш для деления по партициям |
IDEMPOTENCEHEADER |
UUID-значение, формируемое при создании публикации и передаваемое в специальном заголовке при публикации сообщения |
CREATIONTIMESTAMP |
Время добавления события в очередь |
SYS_LASTCHANGEDATE |
Дата последнего изменения записи |
Значения поля STATUS:
NEW— новый элемент очереди событий, еще не был обработан;SENT— элемент очереди событий успешно обработан, сообщение отправлено;ERROR— при обработке произошла ошибка, элемент будет обработан повторно;SKIP— подходящее событие не найдено в процессе обработки элемента очереди событий (скорее всего событие не прошло по условию, указанному в полеcriteriaподписки).
T_DSPC_SYS_SUBSCRIPTION_WORKER
Содержит следующие поля:
Поле |
Описание |
|---|---|
OBJECTID_PARTITIONNUMBER |
Номер партиции (часть первичного ключа) |
OBJECTID_SUBSCRIPTIONID |
Ссылка на подписку (часть первичного ключа) |
HEARTBEAT |
Время последнего обновления |
PROCESSID |
Идентификатор процесса |
STATE |
Состояние обработки партиции («ACTIVE», «CIRCUIT_BREAKING», «UNASSIGNED», «ERROR») |
T_DSPC_SYS_SUBSCRIPTION_PROCESS
Содержит следующие поля:
Поле |
Описание |
|---|---|
OBJECT_ID |
Идентификатор процесса (первичный ключ) |
MAXPARTITIONS |
Максимальное количество партиций, обрабатываемых процессом |
KIND |
Вид процесса (worker/перекладчик) («EVENT_WORKER», «EVENT_TRANSFER») |
HEARTBEAT |
Время последнего обновления |
Конфигурация#
dataspace.subscription.idempotenceHeaderUuidWithHyphens
#Значение по умолчанию: "true"
#true — UUID для заголовка идемпотентности idempotenceHeaderName формируется с дефисами, в результате его размер равен 36 символам
#false — UUID для заголовка идемпотентности idempotenceHeaderName формируется без дефисов, в результате его размер равен 32 символам
dataspace.subscription.event-transfer.transfer-processes-count
#Значение по умолчанию: 1
#Количество процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы перекладчика
dataspace.subscription.event-transfer.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов перекладчика (EVENT_TRANSFER)
dataspace.subscription.event-transfer.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс перекладчика
dataspace.subscription.event-transfer.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс перекладчика будет считаться неактивным
dataspace.subscription.event-transfer.max-transfer-count
#Значение по умолчанию: 1000
#Количество событий, обрабатываемых перекладчиком за один цикл работы
dataspace.subscription.event-worker.worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER
dataspace.subscription.event-worker.error-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER, обрабатывающего ошибки (ERROR_WORKER)
dataspace.subscription.event-worker.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы процесса EVENT_WORKER
dataspace.subscription.event-worker.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов EVENT_WORKER
dataspace.subscription.event-worker.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс EVENT_WORKER
dataspace.subscription.event-worker.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс EVENT_WORKER будет считаться неактивным
dataspace.subscription.event-worker.circuit-breaker-error-count-threshold
#Значение по умолчанию: 10
#Количество ошибок по подписке, по достижении которого включается circuit breaker на эту подписку
dataspace.subscription.event-worker.circuit-breaker-timeout-ms
#Значение по умолчанию: 30000 миллисекунд
#Время с момента включения circuit breaker на подписку, по истечении которого circuit breaker будет выключен
dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold
#Значение по умолчанию: 100
#Пороговое значение размера очереди накопившихся сообщений по подписке
#Пока размер очереди выше указанного значения порога, `ERROR_WORKER` не будет переключать партицию обратно на `EVENT_WORKER`
dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker
#Значение по умолчанию: 5
#Максимальное количество потоков (на процесс) в пуле потоков, используемом для асинхронной отправки
#Результирующее максимальное количество потоков равно:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)
dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker
#Значение по умолчанию: 1000
#Максимальное количество элементов в очереди (на процесс) пула потоков, используемом для асинхронной отправки
#Результирующий размер очереди равен:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)
dataspace.subscription.event-worker.gql-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов GQL_EVENT_WORKER, обрабатывающих события для работы функциональности GraphQL Subscription сервиса DataSpace Core
#Процессы типа GQL_EVENT_WORKER не отображаются в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS
dataspace.subscription.service-tasks.pool-size
#Значение по умолчанию: 5
#Количество потоков в пуле служебных процессов
dataspace.subscription.service-tasks.delete-dead-processes-schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который удаляет неактивные процессы EVENT_TRANSFER и
#EVENT_WORKER из таблицы T_DSPC_SYS_SUBSCRIPTION_PROCESS
dataspace.subscription.service-tasks.load-subscriptions-schedule-delay-ms
#Значение по умолчанию: 1000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который выполняет загрузку подписок из файла subscriptions.xml
#в таблицу T_DSPC_SYS_SUBSCRIPTION, а также вставку недостающих партиций в таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER
#После успешной загрузки процесс перестает запускаться
dataspace.subscription.service-tasks.cleanup-depth-hr
#Значение по умолчанию: 48 часов
#Максимальное время хранения записей в таблицах событий и таблице очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего
#записи будут удалены фоновым процессом, если они уже были обработаны: status == PROCESSED для событий
#и status == SENT для записей T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.cleanup-cron
#Значение по умолчанию: 0 0 0 * * * (каждый день в 00:00)
#Расписание запуска служебного процесса, который удаляет отработанные старые записи из таблиц событий и
#таблицы очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM
dataspace.subscription.service-tasks.tasks-schedule-initial-delay-ms
#Значение по умолчанию: 90000 миллисекунд
#Задержка перед запуском всех фоновых процессов на старте сервиса. Применяется для обеспечения корректной установки health-метрики
#readiness на старте сервиса.
dataspace.subscription.xml.enable-check-on-start
#Значение по умолчанию: true
#Включает/выключает валидацию полей query, criteria, template у подписок, описанных в subscription.xml при старте сервиса
#Только для стендов разработки! При эксплуатации в промышленных условиях должна быть установлена в значение "true"
dataspace.graphql.subscriptions.enabled
#Значение по умолчанию: true
#Включает/выключает функциональность, обслуживающую работу GraphQL Subscription для сервиса DataSpace Core.
dataspace.subscription.grpcServerPort
#Значение по умолчанию: 9001
#Порт, на котором будет запущен gRPC-сервер для обслуживания запросов GraphQL Subscription
Конфигурация в рамках ручной установки сервиса#
В случае использовании Helm-инсталляции в рамках раздела «Ручная установка сервиса» документа «Руководство по установке» необходимо перед инсталляцией задать параметры в values.yaml:
appConfig:
overrideProperties:
dataspace:
subscription:
publisher:
kafka:
config:
<наименование_кластера>:
acks: ''
bootstrap-servers: ''
delivery-timeout-ms: ''
enable-idempotence: ''
key-serializer: org.apache.kafka.common.serialization.StringSerializer
linger-ms: ''
max-in-flight-requests-per-connection: ''
request-timeout-ms: ''
retry-backoff-ms: ''
# Параметры SSL подключения
security-protocol: SSL
ssl-endpoint-identification-algorithm: ''
ssl-keystore-location: ''
ssl-truststore-location: ''
value-serializer: org.apache.kafka.common.serialization.StringSerializer
В среде контейнеризации создать секрет Kind: Secret, указав его имя в параметре в values.yaml:
appConfig:
subscription:
config: "dspc-subscription-ssl"
apiVersion: v1
kind: Secret
metadata:
name: dspc-subscription-ssl
data:
subscription.properties: |-
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-key-password=
Конфигурация в рамках автоматизированной установки сервиса#
В случае использовании автоматизированной установки в рамках раздела «Автоматизированная сборка и установка сервиса» документа «Руководство по установке», если предполагается использование функциональности публикации событий в Kafka, необходимо задать настройки для каждого кластера Kafka в файле customer-properties-subscription.yaml согласно разделу «Добавление пользовательских параметров в дистрибутив» в следующем виде (пример):
kind: ConfigMap
apiVersion: v1
metadata:
name: customer-properties
data:
customer.properties: |-
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.bootstrap-servers={{ local.kafka.bootstrap.servers }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.acks={{ local.kafka.acks }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.delivery-timeout-ms={{ local.kafka.delivery.timeout.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.request-timeout-ms=200
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.retry-backoff-ms=500
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.linger-ms={{ local.kafka.linger.ms }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.enable-idempotence={{ local.kafka.enable.idempotence }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.max-in-flight-requests-per-connection={{ local.kafka.max.in.flight.requests.per.connection }}
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-location=/deployments/credentials/kafkaSslKeystore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-location=/deployments/credentials/kafkaSslTrustwtore
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-endpoint-identification-algorithm=
Пример Kind: Secret:
apiVersion: v1
kind: Secret
metadata:
name: dspc-subscription-ssl
data:
subscription.properties: |-
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-password=12456
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-password=12456
dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-key-password=12456
Пример custom_property.conf.yml:
local.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
local.kafka.acks=all
local.kafka.delivery.timeout.ms=5000
local.kafka.linger.ms=0
local.kafka.enable.idempotence=true
local.kafka.max.in.flight.requests.per.connection=5
# ...
При необходимости параметры можно сделать стендозависимыми по аналогии со стендозависимой конфигурацией subscriptions.xml (см. раздел «Стендозависимая конфигурация subscriptions.xml»).
Внимание!
При использовании SSL установка значения «true» для параметра
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotenceможет приводить к ошибкеorg.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. Для того чтобы этого избежать, необходимо корректным образом настроить операциюIdempotentWriteдля кластера. Подробнее см. в официальной документации продукта Kafka.
В файле subscriptions.xml target необходимо задавать в формате <наименование_кластера>:<наименование_topic>.
Пример: LOCAL_KAFKA:test-topic.
Retry#
Параметры retry/timeout для Kafka задаются в настройках Kafka на конкретный кластер <наименование_кластера>. Для этого необходимо использовать параметры delivery-timeout-ms, request-timeout-ms, retry-backoff-ms.
Параметры retry/timeout для REST берутся из параметров подписки: maxRetryAttempts, timeoutMs, retryDelayMs. Для ошибок с http status code 4xx retry не выполняется (то есть retry — только для 5xx и в случае тайм-аута).
Для идемпотентности потребитель может определить необходимый заголовок (передавая какое-либо из полей event с уникальным значением).
Circuit breaking#
На уровне параметра подписки blocking вводится настройка режима обработки ошибки:
blocking=true— при первой ошибке запрет на отправку остальных публикаций по партиции подписки.blocking=false— разрешена публикация других событий по подписке до наступления условия приостановки.
Поведение при ошибках:
Производится фиксация ошибки. Статус «ERROR» устанавливается для элемента очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM.
Если параметр
blocking=false, то продолжается обработка следующих сообщений, при этом счетчик ошибок по подписке увеличивается.Если параметр
blocking=true, то полностью приостанавливается отправка по этой партиции, при этом счетчик ошибок по подписке увеличивается.При накоплении порогового значения ошибок по подписке (настройка
dataspace.subscription.event-worker.circuit-breaker-error-count-thresholdравна, например, «10») приостанавливается отправка по этой подписке. При этом включается circuit breaker на всю подписку на один экземпляр приложения.Спустя время, указанное в настройке
dataspace.subscription.event-worker.circuit-breaker-timeout-ms, производится попытка повторной отправки, начиная с тех, чья отправка завершилась с ошибкой.Успешная отправка сбрасывает счетчик ошибок.
Примечание
Каждый процесс EVENT_WORKER самостоятельно принимает решение о приостановке и возобновлении обработки партиции. Для того чтобы не останавливать весь поток при ошибке в данных одной из публикаций приостановится только одна партиция.
Мониторинг#
Мониторинг включается настройкой:
dataspace.subscription.metrics.enabled
#"false" — метрики выключены (значение по умолчанию)
#"true" — метрики включены
Реализованы следующие метрики мониторинга:
Наименование метрики |
Тип метрики |
Теги |
Описание |
|---|---|---|---|
dspc.subscription.handle.time |
Timer |
|
Время обработки событий в разрезе подписки и результата отправки |
dspc.subscription.publication.time |
Timer |
|
Время отправки сообщений в разрезе подписки и результата отправки |
dspc.subscription.errors.count |
Counter |
|
Количество ошибок отправки событий в разрезе подписки |
dspc.subscription.skip.count |
Counter |
|
Количество отсеянных событий в разрезе подписки |
dspc.subscription.transfer.count |
Counter |
Скорость перекладки событий |
|
dspc.subscription.partition.state |
Gauge |
|
Статус обработки партиций — количество по T_DSPC_SYS_SUBSCRIPTION_WORKER в разрезе state и subscriptionId |
dspc.subscription.event.queue.size |
Gauge |
|
Количество элементов очереди обработки событий в таблице T_DSPC_SYS_EVENT_QUEUE_ITEM в разрезе подписок и статусов. |
dspc.subscription.new.events.count |
Gauge |
|
Отображает количество событий со статусом |
Метрики dspc.subscription.partition.state, dspc.subscription.event.queue.size и dspc.subscription.new.events.count обновляются фоновым scheduled-процессом. Частота обновления
данных метрик регулируется настройкой:
dataspace.subscription.metrics.schedule-delay-ms
#Значение по умолчанию — 10000 миллисекунд
API повторной отправки событий#
REST API POST /admin/api/subscriptions/<id>/resend позволяет повторно отправить сообщения для событий, которые были уже обработаны ранее.
Внимание!
Переотправка в topic Kafka допустима только при единственной системе, читающей topic, по инициативе этой системы. Все остальные читатели могут получить ранее обработанные ими данные, в результате чего могут образоваться дубли.
Примечание
Переотправка возможна только для тех событий, по которым присутствуют записи в таблицах событий и очереди публикации, то есть они еще не были очищены. Необходимо задавать глубину очистки очередей событий с учетом требований по их возможной переотправке.
Примечание
При наличии в подписке GraphQL-query при повторной отправке он будет выполнен повторно на текущих данных. Как следствие — данные публикации могут отличаться от предыдущего вызова. Если для каких-то данных это критично, то необходимо завести соответствующие атрибуты в самом событии и заполнять их при создании события.
Примечание
Переотправка невозможна для GraphQL-подписок.
Параметры:
id— идентификатор подписки;тело запроса формата json:
{ "filter" : "root.fromReference=='2' && root.referenceName=='client'", "timeRange" : { "from" : "2023-11-29T10:39:46.587", "to" : "2023-12-01T10:39:46.587" } }filter— фильтр по атрибутам события, написанный на языке строковых выражений;timeRange.from,timeRange.to— временной диапазон (учитывается полеcreationTimestampсущности события) для выбора событий, сообщения по которым будут отправлены повторно.
Требования к запросу:
Один из параметров
filterилиtimeRangeобязательно должен быть заполнен, иначе API вернет ошибку405 Method Not Allowedс сообщением «Не задано ни одного условия отбора событий».Если задан параметр
timeRange, тоtoиfromобязательны к заполнению. Еслиtoилиfromне заполнены, то API вернет ошибку400 Bad Requestс сообщением «Поле timeRange#from обязательно для заполнения» или «Поле timeRange#to обязательно для заполнения».
При успешном вызове API возвращает код 200 OK и количество событий, по которым были повторно отправлены сообщения. Если не было найдено не одного события, подходящего под условие выборки, сформированного на основе переданных параметров, то API вернет ошибку 404 Not Found c сообщением «Не найдено подходящих событий для переотправки».
Пример вызова (на основе subscriptions.xml, описанного выше):
curl -d @data.json -H "Content-Type: application/json" -X POST http://localhost:8090/admin/api/subscriptions/mergeSubscription1/resend
data.json:
{
"filter" : "root.$id $in ['7294634218387406849','7294634226977341441'] && root.fromReference=='2' && root.referenceName=='client'",
"timeRange" :
{
"from" : "2023-11-29T10:39:46.587",
"to" : "2023-12-01T10:39:46.587"
}
}
Пример ответа:
5
Внимание!
Если параметры подписки в subscriptions.xml были изменены после первичной публикации события, то при повторной отправке:
сообщение будет направлено на новый url/topic, если были изменены поля
targetи/илиcallback;формат сообщения изменится, если было изменено поле
template;данные сообщения изменятся, если было изменено поле
query;сообщение не будет отправлено, если было изменено поле
criteriaи событие теперь не подходит под новое условие;сообщение будет содержать новый состав заголовков, если было изменено поле
headers.Если на момент первичной публикации события для подписки не был задан атрибут
idempotenceHeaderName, то и при повторной отправке соответствующий заголовок передан не будет, даже если к тому моменту атрибутidempotenceHeaderNameбудет добавлен в подписку.
Клиентский путь#
Порядок действий для подключения функционала subscription:
Определить список событий для публикации.
Добавить в проекте DataSpace в модель данных model.xml соответствующие этим событиям
eventс необходимым для публикации набором атрибутов.Создать в проекте DataSpace конфигурацию подписок subscriptions.xml, предусмотрев необходимые параметры для стендозависимой параметризации.
В конфигурации подписок описать шаблон трансформации данных события (и результата выполнения
query) в JSON для публикации.Сформировать файлы
customer-properties-subscription.yamlиcustom_property.conf.yml, тем самым определив стендозависимую конфигурацию.Выполнить сборку проекта DataSpace.
Доработать приложение для создания событий в необходимых узлах бизнес-процесса (вместе с изменением бизнес-данных или независимо).
Подготовить интеграционное окружение для публикации: кластер Kafka, topics, манифесты Kubernetes и Istio для сетевых взаимодействий, конфигурации Secret Management System для получения и ротации сертификатов и другое.
Развернуть DataSpace и приложение, протестировать интеграцию.
Провести нагрузочное тестирование, по результатам уточнить параметры масштабирования сервиса.
Выпустить релиз приложения совместно с DataSpace.
Пример: передача в «Каталог продуктов» сведений об изменении клиента по продукту при слиянии/разъединении#
Входное состояние: приложение развернуто с DataSpace, настроено слияние/разъединение Master Data Management.
Порядок действий:
Создать модель с событием слияния. model.xml:
<model name="rko-product" version="0.1"> <external-types> <external-type type="ClientEPK" merge-kind="Organization"/> </external-types> <class name="RKOProduct"> <property name="curISO" type="String" length="3"/> <property name="contractNumber" type="String" length="64"/> <reference name="client" type="ClientEPK" description="По этой ссылке выполняется слияние"/> </class> <!-- добавляем в модель событие слияния/разъединения client в RKOProduct --> <event name="RKOMergeEvent" merge-event="true"> <property name="rko" type="RKOProduct" parent="true"/> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="mergeRKOForPPK" name="MergeRKOForPPK" description="Событие о слиянии клиента продукта для каталога продуктов" target="KAFKA" eventType="RKOMergeEvent" callback="PPK_KAFKA:${rko.merge.subscription.1.topic}" validTill="9999-12-31T23:59:59.999Z" async="false" blocking="true"> <criteria> coalesce(root.mergeFailed, false) != true </criteria> <query> { searchRKOProduct(cond: "it.$id=='${rko}'") { elems { id contractNumber curISO client { entityId } } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId11111 ", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "RKOMergeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId01", "rko": "1231415534646745", "mergeKind": "Organization", "referenceName": "client", "fromReference": "1999449494944077", "toReference": "1999449494944942" }, "data": { "searchRKOProduct": { "elems": { "id": "1231415534646745", "contractNumber": "123141553464", "curISO": "RUB", "client": "1999449494944942" } } } } --> <!-- Ожидаемый результат --> <!-- { "Contract": { "ProductCode": "RKO", "ContractID": "1231415534646745", "epkOrgId": "1999449494944942", "ContractNumber": "123141553464", "CurrencyIso": "RUB" } } В примере преобразования в качестве ProductCode подставляется константа "RKO" --> <template> [ { "operation": "shift", "spec": { "event": { "rko": "Contract.ContractID", "toReference": "Contract.epkOrgId" }, "data": { "searchRKOProduct": { "elems": { "*": { "contractNumber": "Contract.ContractNumber", "curISO": "Contract.CurrencyIso" } } } } } }, { "operation": "default", "spec": { "Contract": { "ProductCode": "RKO" } } } ] </template> <headers> -XTenantId=${rko.merge.subscription.1.tenantId} </headers> </subscription> </subscriptions>Настроить подключение к Kafka для публикации событий.
Определить стендозависимую конфигурацию. customer-properties-subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- rko.merge.subscription.1.topic={{ mergeRKOForPPK.topic.value }} rko.merge.subscription.1.tenantId={{ mergeRKOForPPK.tenantId.value }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.bootstrap-servers={{ mergeRKOForPPK.kafka.bootstrap.servers }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer dataspace.subscription.publisher.kafka.config.PPK_KAFKA.acks={{ mergeRKOForPPK.kafka.acks }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.delivery-timeout-ms={{ mergeRKOForPPK.kafka.delivery.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.request-timeout-ms={{ mergeRKOForPPK.kafka.request.timeout.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.retry-backoff-ms={{ mergeRKOForPPK.kafka.retry.backoff.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.linger-ms={{ mergeRKOForPPK.kafka.linger.ms }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.enable-idempotence={{ mergeRKOForPPK.kafka.enable.idempotence }} dataspace.subscription.publisher.kafka.config.PPK_KAFKA.max-in-flight-requests-per-connection={{ mergeRKOForPPK.kafka.max.in.flight.requests.per.connection }}custom_property.conf.yml:
mergeRKOForPPK.topic.value=topic-name mergeRKOForPPK.tenantId.value=tenantId mergeRKOForPPK.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka> mergeRKOForPPK.kafka.acks=all mergeRKOForPPK.kafka.delivery.timeout.ms=5000 mergeRKOForPPK.kafka.request.timeout.ms=200 mergeRKOForPPK.kafka.retry.backoff.ms=500 mergeRKOForPPK.kafka.linger.ms=0 mergeRKOForPPK.kafka.enable.idempotence=true mergeRKOForPPK.kafka.max.in.flight.requests.per.connection=5 # ...Собрать и развернуть приложение.
Выполнить слияние.
Пример: уведомление о состоянии обработки заявки#
Порядок действий:
Создать модель с событием изменения статуса заявки. model.xml:
<model name="rko-product" version="0.2"> <external-types> <external-type type="ClientEPK" merge-kind="Individual"/> </external-types> <enum name="ApplicationStatus"> <value name="DRAFT"/> <value name="SUBMITTED"/> <value name="APPROVED"/> <value name="REJECTED"/> <value name="CANCELLED"/> </enum> <class name="Application"> <property name="code" type="String" length="32"/> <property name="name" type="String" length="254"/> <property name="ref" type="String" length="64" unique="true"/> <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние/разъединение"/> <property name="content" type="Text"/> <property name="applicationStatus" type="String" length="32"/> </class> <!-- добавляем в модель событие изменения статуса заявки --> <event name="StatusChangeEvent"> <property name="application" type="Application" parent="true" /> <property name="reason" type="String" length="2048" description="Причина изменения статуса"/> <property name="eventUser" type="String" length="254" description="Идентификатор пользователя, действия которого привели к изменению статуса" /> </event> </model>Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:
<subscriptions xmlns="DataspaceSubscriptions"> <subscription id="applicationStatusNotify" name="applicationStatusNotify" description="Событие об изменении статуса заявки" target="REST" eventType="StatusChangeEvent" callback="${baseWebHookUrl}/api/v1/statusNotify" validTill="9999-12-31T23:59:59.999Z" maxRetryAttempts="${applications.subscription.1.nretry}" timeoutMs="${applications.subscription.1.timeout}" retryDelayMs="${applications.subscription.1.retrydelay}" async="true" blocking="true"> <query> { searchApplication(cond: "it.$id=='${application}'") { elems { id code name ref applicationStatus } } } </query> <!-- структура данных на входе преобразования --> <!-- { "event": { "objectId": "someEventId555", "creationTimestamp": "2023-04-01T22:22:22.001Z", "type": "StatusChangeEvent", "status": "processed", "lastChangeDate": "2023-04-01T22:22:23.551Z", "ownerId": "someOwnerId02", "application": "1231415534646745", "reason": "Application successfully passed all checks and has been approved", "eventUser": "xxx-uuuuuuu-zzz" }, "data": { "searchApplication": { "elems": { "id": "1231415534646745", "code": "APP-X333", "name": "Application for grant", "ref": "APP-X333-113333332", "applicationStatus": "APPROVED" } } } } --> <!-- Ожидаемый результат --> <!-- { "ApplicationReference": "APP-X333-113333332", "ApplicationCode": "APP-X333", "ApplicationName": "Application for grant", "Status": "APPROVED", "Reason": "Application successfully passed all checks and has been approved", "Timestamp": "2023-04-01T22:22:22.001Z" } --> <template> [ { "operation": "shift", "spec": { "event": { "reason": "Reason" "creationTimeStamp": "Timestamp" }, "data": { "searchApplication": { "elems": { "*": { "code": "ApplicationCode", "name": "ApplicationName", "ref": "ApplicationReference", "applicationStatus": "Status" } } } } } } ] </template> <headers> -XTenantId=${applications.subscription.1.tenantId} -XchangeUser=${eventUser} </headers> </subscription> </subscriptions>Сконфигурировать подключение к REST API для публикации.
Определить стендозависимую конфигурацию. customer-properties-subscription.yaml:
kind: ConfigMap apiVersion: v1 metadata: name: customer-properties data: customer.properties: |- baseWebHookUrl={{ applicationStatusNotify.baseWebHookUrl.value }} applications.subscription.1.nretry=1 applications.subscription.1.timeout=10000 applications.subscription.1.retrydelay={{ applicationStatusNotify.retrydelay.value }} applications.subscription.1.tenantId={{ applicationStatusNotify.tenantId.value }}custom_property.conf.yml:
applicationStatusNotify.baseWebHookUrl.value=http://localhost:8082 applicationStatusNotify.retrydelay.value=2000 applicationStatusNotify.tenantId.value=tenantId # ...Собрать DataSpace.
Задать стендозависимые параметры.
Развернуть приложение.
Вызвать
/packetAPI DataSpace с командойcreateдляeventи командой установкиapplicationStatusв «APPROVED».{ "jsonrpc": "2.0", "method": "execute", "id": 1, "params": { "packet": { "commands": [ { "id": "updateApplication", "name": "update", "params": { "type": "Application", "id": "<идентификатор_заявки>", "applicationStatus": "APPROVED" } }, { "id": "createStatusChangeEvent", "name": "create", "params": { "type": "StatusChangeEvent", "application": "<идентификатор_заявки>", "reason": "заявка обработана", "eventUser": "<идентификатор_пользователя>" } } ] } } }
Пример: публикация в topic Kafka события изменения суммы на счету AccountBalanceChangeEvent#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<!-- добавляем дочерний AccountBalanceChangeEvent -->
<event name="AccountBalanceChangeEvent" extends="BaseChangeEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="balance" />
</event>
Создать подписку:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountBalanceChange" name="AccountBalanceChange"
description="Публикация события об изменении суммы счета" validTill="2025-01-01T00:00:00.000Z"
target="KAFKA" callback="PPK_KAFKA:${project.account.subscription.1.topic}"
eventType="AccountBalanceChangeEvent"
async="false" blocking="true">
<query>
{
searchAccount (cond: "it.id == ${account}") {
elems {
number
balance {
value
currency
}
}
}
}
</query>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountAmountChangeEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "12314155346467451",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z"
},
"data": {
"searchAccount":{
"elems": [
{
"number": "40817810500000000223"
"balance": {
"value": 100,
"currency": "810"
}
}
]
}
}
}
-->
<!-- Ожидаемый результат
{
"changeNo" : 3,
"timeChanged" : "2023-04-01T22:22:23.551Z",
"amount" : 100,
"currency" : "810",
"account" : "40817810500000000223"
}
-->
<template>
[
{
"operation": "shift",
"spec": {
"event": {
"sysVersion": "changeNo",
"sysTimeChanged": "timeChanged"
},
"data": {
"searchAccount": {
"elems": {
"*" : {
"number": "account",
"balance": {
"value": "amount",
"currency": "currency"
}
}
}
}
}
}
}
]
</template>
</subscription>
</subscriptions>
Пример: публикация в topic Kafka события изменения состояния счета AccountStatusChangeEvent#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<!-- добавляем дочерний AccountStatusChangeEvent -->
<event name="AccountStatusChangeEvent" extends="BaseChangeEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="statusForAccounting" />
</event>
Создать подписку:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountStatusChange" name="AccountStatusChange"
description="Публикация события об изменении статуса счета" validTill="2025-01-01T00:00:00.000Z"
target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.2.topic}"
eventType="AccountStatusChangeEvent"
async="false" blocking="true">
<query>
{
searchAccount (cond: "it.id == ${account}") {
elems {
number
statusForAccounting {
code
}
}
}
}
</query>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountStatusChangeEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "1231415534646745",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z"
},
"data": {
"searchAccount":{
"elems": [
{
"number": "40817810500000000223",
"statusForAccounting": {
"code"
}
}
]
}
}
}
-->
<!-- Ожидаемый результат
{
"changeNo": 3,
"timeChanged": "2023-04-01T22:22:23.551Z",
"status": "frozen",
"account": "40817810500000000223"
}
-->
<template>
[
{
"operation": "shift",
"spec": {
"event": {
"sysVersion": "changeNo",
"sysTimeChanged": "timeChanged"
},
"data": {
"searchAccount": {
"elems": {
"*" : {
"number": "account",
"statusForAccounting": {
"code":"status"
}
}
}
}
}
}
}
]
</template>
</subscription>
</subscriptions>
Пример: репликация состояния счета в OpenSearch за счет публикации события AccountObjectEvent#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<!-- добавляем дочерний AccountObjectEvent -->
<event name="AccountObjectEvent" extends="BaseObjectEvent">
<property name="account" type="Account" parent="true"/>
</event>
Создать подписку на создание и изменение счета и подписку на удаление счета:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountObjectEvent" name="AccountObjectEvent"
description="Репликация состояния счета в OpenSearch" validTill="2025-01-01T00:00:00.000Z"
target="REST" callback="PUT ${project.account.subscription.3.url}/_doc/${account}"
maxRetryAttempts="10"
timeoutMs="5000"
retryDelayMs="10000"
eventType="AccountObjectEvent"
async="false" blocking="true">
<criteria>root.sysObjectEvent != 'D'</criteria>
<query>
{
searchAccount (cond: "it.id == ${account}") {
elems {
number
description
accountType
balance {
value
currency
}
client {
entityId
}
statusForAccounting {
code
}
branch {
code
location
}
statementInfo {
periodicity
title
}
tags {
elems
}
postings {
elems {
amount
}
}
}
}
}
</query>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountObjectEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "1231415534646745",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z",
"sysObjectEvent": "C"
},
"data": {
"searchAccount":{
"elems": [
{
"number": "40817810500000000223",
"description": "Personal Account",
"accountType": "INDCUR",
"balance": {
"value" : 100.0,
"currency": "978"
},
"client": {
"entityId": "0123445"
},
"statusForAccounting": {
"code": "active"
},
"branch": {
"code": "ABBC",
"location": "Country, City, Street, Block"
},
"statementInfo": {
"periodicity": 30,
"title": "Account statement"
},
"tags": {"elems" : ["personal", "salary"]},
"postings": {"elems": [ {"amount":100.0},{"amount":-50.0}] }
}
]
}
}
}
-->
<!-- Ожидаемый результат
{
"number" : "40817810500000000223",
"description" : "Personal Account",
"accountType" : "INDCUR",
"balance" : {
"amount" : 100,
"currency" : "978"
},
"client" : "0123445",
"statusForAccounting" : {
"code" : "active"
},
"branch" : {
"code" : "ABBC",
"location" : "Country, City, Street, Block"
},
"statement" : {
"periodicity" : 30,
"title" : "Account statement"
},
"tags" : ["personal", "salary" ],
"postings" : [ { "amount" : 100.0}, {"amount" : -50.0} ],
"sysVersion" : 3,
"sysTimeChanged" : "2023-04-01T22:22:23.551Z"
}
-->
<template>
[
{
"operation": "shift",
"spec": {
"data": {
"searchAccount": {
"elems": {
"*" : {
"number": "number",
"description": "description",
"accountType": "accountType",
"balance": {
"value": "balance.amount",
"currency": "balance.currency"
},
"client": "client",
"statusForAccounting": "statusForAccounting",
"branch": "branch",
"statementInfo": "statement",
"tags": {"elems": "tags"},
"postings": {"elems":"postings"}
}
}
}
},
"event": {
"sysVersion": "sysVersion",
"sysTimeChanged": "sysTimeChanged"
}
}
}
]
</template>
</subscription>
<!-- Подписка на удаление -->
<subscription id="AccountObjectDeleteEvent" name="AccountObjectDeleteEvent"
description="Репликация удаления счета в OpenSearch" validTill="2025-01-01T00:00:00.000Z"
target="REST" callback="DELETE ${project.account.subscription.3.url}/_doc/${account}"
maxRetryAttempts="10"
timeoutMs="5000"
retryDelayMs="10000"
eventType="AccountObjectEvent"
async="false" blocking="true">
<criteria>root.sysObjectEvent == 'D'</criteria>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountObjectEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "1231415534646745",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z",
"sysObjectEvent": "D"
}
}
-->
</subscription>
</subscriptions>
Пример: публикация события AccountAmountAndStatusTracking в topic Kafka#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<!-- добавляем дочерний AccountTrackBalanceAndStatusEvent -->
<event name="AccountTrackBalanceAndStatusEvent" extends="BaseTrackingEvent">
<property name="account" type="Account" parent="true"/>
<parents-property name="balance" />
<parents-property name="statusForAccounting.code" rename="statusCode"/>
</event>
Создать подписку:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountTrackBalanceAndStatusEvent" name="AccountTrackBalanceAndStatusEvent"
description="Публикация события об изменении суммы счета" validTill="2025-01-01T00:00:00.000Z"
target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.4.topic}"
eventType="AccountTrackBalanceAndStatusEvent"
async="false" blocking="true">
<query>
{
searchAccount (cond: "it.id == ${account}") {
elems {
number
}
}
}
</query>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountTrackAmountAndStatusEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "12314155346467451",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z",
"sysObjectEvent": "U",
"sysChangeUser": "P01234412",
"balance": {
"value": 100,
"currency": "810"
},
"statusCode": "open"
},
"data": {
"searchAccount":{
"elems": [
{
"number": "40817810500000000223"
}
]
}
}
}
-->
<!-- Ожидаемый результат
{
"changeNo" : 3,
"timeChanged" : "2023-04-01T22:22:23.551Z",
"amount" : 100,
"currency" : "810",
"status": "open",
"account" : "40817810500000000223",
"user": "P01234412"
}
-->
<template>
[
{
"operation": "shift",
"spec": {
"event": {
"sysVersion": "changeNo",
"sysTimeChanged": "timeChanged",
"balance": {
"value": "amount",
"currency": "currency"
},
"statusCode": "status",
"sysChangeUser": "user"
},
"data": {
"searchAccount": {
"elems": {
"*" : {
"number": "account"
}
}
}
}
}
}
]
</template>
</subscription>
</subscriptions>
Пример: аудит изменений счета — публикация события AccountSnapshotEvent в Аудит#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<!-- добавляем дочерний AccountSnapshotEvent -->
<event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true">
<property name="account" type="Account" parent="true"/>
<parents-property name="statusForAccounting.code" rename="statusCode"/>
<parents-property name="branch.code" rename="branchCode"/>
<parents-property name="status" rename="accountStatus"/>
<parents-property name="statementInfo.periodicity" rename="statementPeriodicity"/>
</event>
Создать подписку:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountSnapshotEvent" name="AccountSnapshotEvent"
description="Аудит всех изменений счета" validTill="2025-01-01T00:00:00.000Z"
target="REST" callback="${project.account.subscription.5.url}/audit/postEvent"
maxRetryAttempts="10"
timeoutMs="5000"
retryDelayMs="10000"
eventType="AccountSnapshotEvent"
async="false" blocking="true">
<criteria>root.sysObjectEvent != 'D'</criteria>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountObjectEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "1231415534646745",
"sysVersion": 3,
"sysTimeChanged": "2023-04-01T22:22:23.551Z",
"sysObjectEvent": "C",
"sysChangeUser": "P01234412",
"accountGroup": "553453453555",
"accountType": "INDCUR",
"number": "40817810500000000223",
"balance": {
"value" : 100.0,
"currency": "978"
},
"statementInfo": "61231313132312312",
"branch": "1231313132312312",
"client": "0123445",
"description": "Personal Account",
"hash": "AFFFCD02E1",
"accountStatus":"active",
"product": "345355555345553",
"branchCode": "1234",
"statusForAccounting":"4",
"statusCode": "open",
"statementPeriodicity": 7
}
}
-->
<!-- Ожидаемый результат
{
"userLogin" : "P01234412",
"params" : {
"_changeType" : "C",
"accountId" : "1231415534646745",
"number" : "40817810500000000223",
"description" : "Personal Account",
"accountType" : "INDCUR",
"amount" : 100,
"currency" : "978",
"client" : "0123445",
"statusCode" : "open",
"branch" : "1234",
"accountStatus" : "active",
"statementPeriodicity" : 7,
"checksum": "AFFFCD02E1"
},
"name" : "AccountChange",
"userNode" : null,
"metamodelVersion" : "0.1",
"module" : "DSPC",
"userName" : null,
"session" : null
}
-->
<template>
[
{
"operation": "shift",
"spec":{
"event": {
"createdAt": "sysTimeChanged",
"sysChangeUser": "userLogin",
"sysVersion": "params.sysVersion",
"sysObjectEvent": "params._changeType",
"account": "params.accountId",
"number": "params.number",
"description": "params.description",
"accountType": "params.accountType",
"balance": {
"value": "params.amount",
"currency": "params.currency"
},
"client": "params.client",
"statusCode": "params.statusCode",
"branchCode": "params.branch",
"accountStatus": "params.accountStatus",
"statementPeriodicity": "params.statementPeriodicity",
"hash": "params.checksum"
}
}
},
{
"operation": "default",
"spec": {
"metamodelVersion": "0.1",
"name": "AccountChange",
"session": null,
"module": "DSPC",
"userName": null,
"userNode": null
}
}
]
</template>
</subscription>
</subscriptions>
Пример: публикация события об изменении истории счета в topic Kafka#
Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».
В модель предметной области добавить событие:
<event name="AccountHistoryEvent" extends="BaseHistoryEvent">
<property name="account" type="Account" parent="true"/> <!-- Примечание: Тип `Account` историцируемый, т.к. есть историцируемые поля -->
</event>
Создать подписку:
<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
<subscription id="AccountHistory" name="AccountHistory" description="Публикация события об изменении истории счета" validTill="2025-01-01T00:00:00.000Z"
target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.1.topic}"
eventType="AccountHistoryEvent"
async="false" blocking="true">
<query>
{
merge {
elems {
... on Account @mergeReqSpec(cond: "it.$id=='${account}'") {
id
number
accountType
}
... on AccountHistory @mergeReqSpec(cond: "it.sysHistoryOwner.id=='${account}' && it.sysHistNumber==${sysVersion}") {
sysHistNumber
balance {
value
sysValueUpdated
currency
sysCurrencyUpdated
}
}
}
}
}
</query>
<!-- структура данных на входе преобразования
{
"event": {
"objectId": "someEventId11111 ",
"creationTimestamp": "2023-04-01T22:22:22.001Z",
"type": "AccountHistoryEvent",
"status": "processed",
"lastChangeDate": "2023-04-01T22:22:23.551Z",
"ownerId": "someOwnerId01",
"account": "1231415534646745",
"sysVersion": 3,
"timeChanged": "2023-04-01T22:22:23.551Z"
},
"data": {
"merge":{
"elems": [
{
"id": "1231415534646745",
"number": "40817810500000000223",
"accountType": "CACC"
},
{
"sysHistNumber": 4,
"balance": {
"value": 1000.00,
"sysValueUpdated": true,
"currency": null,
"sysCurrencyUpdated": false
}
}
]
}
}
}
-->
<!-- Ожидаемый результат
{
"changeNo": 3,
"timeChanged": "2023-04-01T22:22:23.551Z",
"account": "40817810500000000223",
"amount": 1000.0,
"currency": null
}
-->
<template>
[
{
"operation": "shift",
"spec": {
"event": {
"sysVersion": "changeNo",
"timeChanged": "timeChanged"
},
"data": {
"merge": {
"elems": {
"*" : {
"number": "account",
"balance": {
"value": "amount",
"currency": "currency"
}
}
}
}
}
}
}
]
</template>
</subscription>
</subscriptions>