Руководство по работе с сервисом DataSpace Subscription#

О документе#

В составе продукта Platform V DataSpace (APT) поставляется сервис DataSpace Subscription, реализующий универсальный (повторно используемый разными потребителями) механизм, позволяющий DataSpace участвовать в событийном обмене (выступать в роли провайдера событий).

В данном документе приведено описание настройки сервиса DataSpace Subscription и работы с ним.

Расшифровку основных понятий см. в перечне «Термины и определения».

Описание механизма Subscription#

События#

Предусмотрена возможность формирования событий (event) по видам изменений:

  • Явное создание объекта события в пакете команд и mutation GraphQL.

  • Проведение операции слияния/разъединения для заданного класса.

  • Изменение заданного набора свойств сущности средствами DataSpace.

  • Создание и удаление, а также изменение любого или заданного набора свойств сущности средствами DataSpace.

  • Создание записи в истории для историцируемой сущности при ее изменении средствами DataSpace.

Состав полей события и его привязку к другим объектам определяет потребитель DataSpace в модели своей предметной области при помощи элементов event.

Подписки и публикация событий#

Для публикации событий в другие системы создаются подписки на события. В подписке задаются следующие параметры:

  • тип события;

  • способ публикации — через topic Kafka или через webhook (вызов REST API);

  • спецификация преобразования данных события в тело сообщения;

  • GraphQL-запрос (query), который должен быть выполнен при публикации соответствующего события для получения дополнительных данных с целью публикации;

  • дополнительный фильтр по данным события (если фильтр не вернул данные, то публикация не осуществляется);

  • тайм-аут и количество retry при публикации через webhook;

  • возможность определения дополнительных заголовков (headers), содержимое которых может быть задано явно или параметризовано данными события (event).

Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml (см. раздел «Спецификация подписок»).

Тело сообщения для публикации представляет собой JSON, формируемый по заданному в подписке преобразованию из JSON-данных event и полей из результирующего query. По умолчанию для преобразования из JSON в JSON используется библиотека JOLT.

Примечание

При необходимости можно воспользоваться демо-сайтом JOLT трансформации.

Помимо публикации в другие системы можно подписаться на события с использованием GraphQL subscription API. Описание приведено в разделе «Подписка» руководства «Протокол GraphQL».

Порядок подключения функциональности subscription с примерами приведен в разделе «Клиентский путь».

Диаграмма событийного обмена#

@startuml
title Публикация событий
participant "Приложение" as app
participant "DataSpace-Core" as core
database "DataSpace DB" as db
participant "DataSpace-Subscription" as subs
queue "topic Kafka" as topic
participant "Подписчик 1" as subscriber1
participant "Подписчик N" as subscribern
app -> core : пакет с командами\ncreateEvent
core -> db : сохранить\nсобытие
db -> subs : прочитать\nновые события
subs -> db : выполнить запрос\nзаданный в подписке 1
subs -> topic : опубликовать\nсобытие через\ntopic Kafka 
topic -> subscriber1 : прочитать\nсобытие  
subs -> db : выполнить запрос\nзаданный в подписке N
subs -> subscribern : опубликовать\nсобытие через\nREST API
subs -> db : по прошествии\nзаданного периода\nудалить все данные\nсобытия и публикации 
@enduml

Виды событий#

Существует два основных вида событий: формируемые явно (далее — явные события) и формируемые автоматически (далее — автоматические события).

Явные события#

Явные события используются с целью создания в транзакции произвольных событий с собственными наборами данных для их последующей публикации.

Событие необходимо создавать явно в составе пакета команд.

Данный вид событий объявляется в модели данных при помощи элемента event.

Для события в модели данных объявляется набор его свойств, которым при создании задаются значения.

Автоматические события#

Автоматические события создаются DataSpace неявно при выполнении определенных действий над данными, которые зависят от типа события.

Чтобы DataSpace создал автоматическое событие, его необходимо объявить в модели данных в качестве дочернего события к сущности, действия над которой требуется отслеживать. Такое событие может быть унаследовано от родительского события одного из базовых типов: BaseMergeEvent, BaseChangeEvent, BaseObjectEvent, BaseTrackingEvent, BaseSnapshotEvent, BaseHistoryEvent. Базовый тип определяет условия для формирования события:

Родительский тип события

Действие, приводящее к формированию события

Решаемая задача

Пример

Состав данных события

BaseMergeEvent

Изменение данных при обработке события слияния/разъединения

Информирование о произведенных фоновым процессом DataSpace изменениях в данных

При дедубликации клиентов необходимо производить перепривязку продуктов в CRМ-системе. В модели данных для сущности Product создается дочернее событие, унаследованное от BaseMergeEvent. В конфигурации DataSpace создается подписка для публикации события в topic Kafka. CRM-система подписывается на topic и обрабатывает события

Метка времени изменения, версия объекта, ссылка на родительскую сущность, имя модифицированного поля, его новое и предыдущее значения

BaseHistoryEvent

Создание записи в истории сущности

Публикация истории изменений сущности без дублирования данных истории в самом событии

Для сущности Account ведется история по полям status и amount. Необходимо формировать события при изменений истории (то есть значений status и amount) и отправлять их в topic Kafka. В модели данных для сущности Account создается дочернее событие, унаследованное от BaseHistoryEvent. В конфигурации DataSpace создается подписка для публикации события в топик Kafka

Метка времени изменения, версия объекта, ссылка на родительскую сущность

BaseChangeEvent

Изменение заданных полей объекта

Мониторинг изменений полей

На экране отображаются котировки акций. Необходимо их обновлять в реальном времени. В модели данных к сущности Quote создается дочернее событие QuoteChangeEvent, унаследованное от BaseChangeEvent. В приложении создается GraphQL subscription на QuoteChangeEvent с фильтром по отображаемым на экране котировкам

Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта

BaseObjectEvent

Создание и удаление объекта, изменение любого поля

Репликация объекта

Поисковый движок используется для полнотекстового и векторного поиска объектов, ведущихся в DataSpace. Необходимо поддерживать в нем актуальное состояние объектов. В модели данных к сущности создается дочерний BaseObjectEvent. Для этого события создается REST-подписка для вызова API загрузки данных поискового движка. В подписке для выборки релевантных атрибутов используется GraphQL-запрос текущего состояния объекта

Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete

BaseTrackingEvent

Создание и удаление объекта, изменение заданных полей

Аудит изменения избранных полей

В составе объекта есть несколько значимых атрибутов, все изменения по которым надо отслеживать. Например, остаток по счету. Создается дочерний BaseTrackingEvent для поля balance. В конфигурации DataSpace создается подписка для публикации события в topic Kafka. События из topic вычитываются и по ним в витрине данных на каждое изменение остатка создается отдельная запись движения по счету

Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete, идентификатор пользователя, значения заданных полей сущности

BaseSnapshotEvent

Создание и удаление объекта, изменение любого поля

Аудит изменения объекта. Снапшоты версий объекта. Undo/Redo

Требуется организовать аудит изменения данных. Для интересующих сущностей создаются дочерние события BaseSnapshotEvent. В конфигурации DataSpace создаются подписки для публикации событий в topic Kafka системы Аудита

Метка времени изменения, версия объекта, ссылка на родительскую сущность, ID объекта, действие — Create/Update/Delete, идентификатор пользователя, значения всех полей сущности (кроме коллекций)

Для отсылки к полям родительской сущности используется специальный элемент модели — parents-property.

Для отслеживания изменения полей родительской сущности в событиях, унаследованных от BaseChangeEvent, эти поля объявляют при помощи parents-property.

В событиях, унаследованных от BaseTrackingEvent, для этих полей будут созданы собственные поля события, в которых будут сохранены значения по завершении транзакции.

В событиях, унаследованных от BaseSnapshotEvent, будут созданы собственные поля для всех неколлекционных полей родительской сущности, в которых будет сохранено состояние объекта по завершении транзакции.

В событиях, унаследованных от BaseTrackingEvent и BaseSnapshotEvent, при помощи parents-property с атрибутом rename можно задать дополнительные поля, значения которых фиксируются при создании события, например, поля справочников по ссылке.

При удалении объекта в событиях, унаследованных от BaseTrackingEvent и BaseSnapshotEvent, сохраняются последние значения полей перед удалением.

Схема типов событий#

Любое событие является наследником типа BaseEvent, который определяет базовый набор атрибутов события.

Представленная ниже диаграмма типов событий иллюстрирует взаимосвязи между типами событий и их базовыми типами.

hide empty methods

class BaseEvent {
  objectId: String
  creationTimestamp: OffsetDateTime
  type: ClassType // не отображется в данных события при публикации
  status: EventStatus // не отображется в данных события при публикации
  lastChangeDate: OffsetDateTime
  ownerId: String
  aggregateRootId: String // не отображется в данных события при публикации
}
class BaseChangeEvent {
 sysVersion: Long = ParentAggregate.version
 sysTimeChanged: OffsetDateTime = ParentAggregate.lastChangeDate
}
BaseEvent <|- BaseChangeEvent

class BaseObjectEvent {
  sysObjectEvent: SysObjectEvent enum {C, U, D}
}
BaseChangeEvent <|- BaseObjectEvent

class BaseTrackingEvent {
  sysChangeUser: String
}
BaseObjectEvent <|- BaseTrackingEvent

class BaseSnapshotEvent {
}
BaseTrackingEvent <|- BaseSnapshotEvent

class ParentAggregate {
  aggregateId: String
  version : Long
  lastChangeDate : DateTime
}

class ParentClass {
  aggregateRootId: String
  parentAttr1
  parentAttr2
  parentAttrN
}
ParentAggregate  o-down-- ParentClass

class ParentChangeEvent {   
  parentClass: ParentClass parent="true" mandatory="true"
// parents-property parentAttr1
// parents-property parentAttr2
} 
ParentChangeEvent -down--|> BaseChangeEvent
ParentClass  *-down- ParentChangeEvent

class ParentObjectEvent {
  parentClass: ParentClass parent="true" mandatory="true"
// parents-property parentAttr1
// parents-property parentAttr2
// parents-property parentAttrN
} 
ParentObjectEvent -down--|> BaseObjectEvent
ParentClass  *-down- ParentObjectEvent

class ParentTrackingEvent {
  parentClass: ParentClass parent="true" mandatory="true"
  parentAttr1
  parentAttr2
} 
ParentTrackingEvent -down--|> BaseTrackingEvent
ParentClass  *-down- ParentTrackingEvent

class ParentSnapshotEvent {
  parentClass: ParentClass parent="true" mandatory="true"
  parentAttr1
  parentAttr2
  parentAttrN
} 
ParentSnapshotEvent -down--|> BaseSnapshotEvent
ParentClass  *-down- ParentSnapshotEvent

Требования к модели данных#

В модели данных необходимо описать события (event), которые будут обрабатываться при работе сервиса DataSpace Subscription.

Явные события в модели данных#

Явное событие может выступать как в роли самостоятельного агрегата, так и являться дочерним объектом одного из существующих агрегатов в модели — для этого у события должна быть объявлена ссылка на родительский класс (размеченная атрибутом parent=»true»). Событие, у которого отсутствует родительская ссылка, является самостоятельным агрегатом.

Данный вид событий необходимо явно создавать в составе пакета команд.

Отличия явного еvent от класса:

  • для него определены только операции создания и поиска;

  • служит только для подписки на него;

  • не наследуется;

  • имеет плоскую структуру;

  • допустима ссылка на родительский класс для формирования агрегатной связи; event, у которого отсутствует родительская ссылка, является самостоятельным агрегатом;

  • обработанный всеми подписками event автоматически удаляется по истечении времени;

  • не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.

Пример описания события, являющегося самостоятельным агрегатом:

<model model-name="MyEventDemo" version="DEV-SNAPSHOT">
    <event name="MyEvent">
        <property name="message" type="String"/>
    </event>
</model>

Пример события, которое входит в агрегат Product:

<model model-name="ProductEventDemo" version="DEV-SNAPSHOT">

  <class name="Product" label="Продукт" lockable="true">
    <property name="name" type="String" label="Наименование"/>
  </class>

  <event name="ProductEvent">
    <property name="message" type="String"/>
    <property name="product" type="Product" parent="true"/>
  </event>
    
</model>

Для свойств событий допустимыми атрибутами являются: name, type, parent, label, length, scale, mandatory, default-value.

Внимание!

При выполнении поисковых операций следует учитывать, что обработанные события по прошествии заданного времени удаляются из БД.

BaseEvent#

Любое событие наследует от абстрактного класса BaseEvent, имеющего следующие свойства:

Свойства BaseEvent:

Свойство

Тип

Описание

objectId

String (254)

ID события

creationTimestamp

OffsetDateTime

Метка времени создания события

type

String (254)

Тип события (имя класса события).
Не отображается в данных события при публикации

status

enum

Статус обработки события:
new — создан,
processed — обработан перекладчиком (конечный статус).
Не отображается в данных события при публикации

lastChangeDate

Date

Время последнего изменения записи

aggregateRootId

String (254)

ID корня агрегата
*Если событие является корнем агрегата — поле aggregateRootId отсутствует, аналогично обычным классам.
Не отображается в данных события при публикации

ownerId

String (254)

ID тенанта

partitionHash

Long

Предрассчитанный хеш для деления по партициям.
Не отображается в данных события при публикации

Автоматические события в модели данных#

Автоматические события, в отличие от явных, не нужно создавать в пакете команд. Они создаются самим модулем DataSpace в ответ на некоторые действия, произошедшие в процессе работы модуля.

Отличия автоматического еvent от класса:

  • для него определена только операции поиска;

  • служит только для подписки на него;

  • не может иметь вручную объявленные свойства;

  • обязательна ссылка на родительский класс для формирования агрегатной связи;

  • обработанный всеми подписками event автоматически удаляется по истечении времени;

  • должно наследоваться от предопределенных базовых типов автоматических событий (подробно будут рассмотрены далее):

    • BaseChangeEvent,

    • BaseObjectEvent,

    • BaseTrackingEvent,

    • BaseSnapshotEvent,

    • BaseMergeEvent,

    • BaseHistoryEvent;

  • не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.

События слияния/разъединения (BaseMergeEvent)#

Отдельным видом событий изменения являются события слияния/разъединения.

Если сконфигурировать подписку на события слияния/разъединения, то при выполнении операции слияния/разъединения сформируется событие слияния/разъединения, которое будет опубликовано в соответствии с подпиской.

Для этого в модели предметной области необходимо объявить event, унаследованный от BaseMergeEvent при помощи атрибут extends со значением BaseMergeEvent. Этот event необходимо связать с классом, содержащим референс, обрабатываемый при слиянии/разъединении.

Внимание!

Способ объявления при помощи признака merge-event="true" является устаревшим, пользоваться им не следует.

Такие event не доступны для создания через пакет команд или mutation GraphQL. В качестве значения тенанта (атрибут ownerId) для таких событий устанавливается значение, равное значению тенанта того агрегата, в рамках которого произошли изменения.

Для событий слияния/разъединения обязательно наличие ссылки на родительский класс, но других собственных свойств для них задать нельзя.

Пример объявления:

<model model-name="MergeEventDemo" version="DEV-SNAPSHOT">
    
    <external-types>
        <external-type type="Client" merge-kind="Organization"/>
    </external-types>
    
    <class name="Product" label="Продукт" lockable="true">
        <property name="name" type="String" label="Наименование"/>
        <property name="code" type="String" label="Код"/>
        <reference name="client" type="Client" label="Идентификатор клиента"/>
    </class>
    
    <event name="ProductClientMergeEvent" extends="BaseMergeEvent">
        <property name="product" type="Product" parent="true"/>
    </event>
    
</model>

Собственные свойства BaseMergeEvent:

Свойство

Тип

Описание

mergeKind

String (254)

Вид слияния — параметр процесса слияния/разъединения

referenceName

String (254)

Имя обрабатываемой при слиянии ссылки

fromReference

String (254)

Предыдущее значение ссылки

toReference

String (254)

Новое значение ссылки

mergeFailed

Boolean

Признак успешного завершения процесса слияния/разъединения («true» — если процесс завершился неудачей, «false» или «null» — если успешно)

errorCode

String (5)

Код ошибки

errorMessage

String (254)

Сообщение об ошибке

События слияния/разъединения создаются автоматически сервисом DataSpace ReferenceUpdater.

Если процесс слияния/разъединения завершился успешно, то формируется event со значением поля mergeFailed = false.

Предусмотрена возможность создания события в том случае, если процесс слияния/разъединения завершился с ошибкой, требующей постороннего вмешательства. За это отвечает настройка сервиса дедубликации duplication.service.enable-failed-merge-events. По умолчанию формирование таких событий выключено: duplication.service.enable-failed-merge-events=false.

Если перевести настройку duplication.service.enable-failed-merge-events сервиса дедубликации в значение true, то при возникновении ошибки процесса слияния/разъединения будут создаваться события со значением поля mergeFailed = true. При этом также заполняются поля errorCode — код ошибки и errorMessage — сообщение об ошибке.

Виды ошибок, при которых формируется event со значением поля mergeFailed = true, предопределены.

Доступен вид ошибок UNIQ — ошибка нарушения уникальности.

Для подписки на неуспешные события слияния/разъединения необходимо использовать фильтр root.mergeFailed == true в поле criteria для подписки.

Примечание

Если функциональность обработки событий слияния/разъединения клиентов первый раз используется потребителем, начиная с версии компонентов DataSpace 1.11.0 и выше, и если настройка сервиса дедубликации duplication.service.enable-failed-merge-events установлена в значение true, то для подписки на успешные события слияния/разъединения достаточно использовать фильтр root.mergeFailed == false в поле criteria для подписки.

Если данная функциональность использовалась до версии 1.11.0, а также если настройка сервиса дедубликации duplication.service.enable-failed-merge-events установлена в значение true, то следует использовать фильтр coalesce(root.mergeFailed, false) != true в поле criteria для подписки.

Примечание

События изменений также будут формироваться при работе сервиса дедубликации, если они отслеживают изменения дедублицируемой ссылки.

Диаграмма событийного обмена при слиянии/разъединении#

@startuml
title Публикация событий при слиянии/разъединении
participant "Master Data Management" as app
participant "DataSpace-StateMachine" as stm
participant "DataSpace-ReferenceUpdater" as reup
database "DataSpace DB" as db
participant "DataSpace-Subscription" as subs
queue "topic Kafka" as topic
participant "Подписчик" as subscriber1
app -> stm : событие\nо слиянии клиентов
stm -> db : создать задачу\nна слияние
db -> reup : получить и выполнить\nзадачу на слияние
reup -> db : найти все\nобъекты для слияния
reup -> db : для каждого объекта\n — заменить значение\nссылки на новое,\nсформировать событие слияния
db -> subs : прочитать\nновые события
subs -> db : выполнить запрос\nзаданный в подписке
subs -> topic : опубликовать\nсобытие через\ntopic Kafka 
topic -> subscriber1 : прочитать\nсобытие  
subs -> db : по прошествии\nзаданного периода\nудалить все данные\nсобытия и публикации 
@enduml

События создания записи историцирования (BaseHistoryEvent)#

Данный тип событий применим только к историцируемым сущностям (см. Историцирование). Для них обязательно наличие ссылки на родительский класс, но других собственных свойств для них задать нельзя. Событие формируется при создании записи в истории родительской сущности. Для родительского типа можно объявить только один event, унаследованный от BaseHistoryEvent. Пример объявления:

<model model-name="HistoryEventDemo" version="DEV-SNAPSHOT">
    
    <class name="Account">
        <property name="number" type="String" length="32" unique="true"/>
        <property name="accountType" type="String" length="4" />
        <property name="currency" type="String" length="3" historical="true" />
        <property name="amount" type="Decimal" length="20" scale="2" historical="true"/>
    </class>
    <!-- создаем дочерний AccountHistoryEvent -->
    <event name="AccountHistoryEvent" extends="BaseHistoryEvent"> 
        <property name="account" type="Account" parent="true"/> <!-- Примечание: Тип `Account` историцируемый, т.к. есть историцируемые поля -->
    </event>

</model>

События отслеживания изменений (BaseChangeEvent, BaseObjectEvent, BaseTrackingEvent, BaseSnapshotEvent)#

События отслеживания изменений формируются в ответ на изменения отслеживаемых классов.

Таким образом дается возможность подписаться на изменение сущностей.

Для этого в модели предметной области необходимо объявить event, указав в качестве типа parent свойства тот класс, изменения которого необходимо отслеживать и формировать события. А также необходимо указать в атрибуте extends один из базовых, предопределенных классов событий изменения сущностей:

  • BaseChangeEvent;

  • BaseObjectEvent;

  • BaseTrackingEvent;

  • BaseSnapshotEvent.

Для каждого из них предусмотрена своя логика формирования событий, описанная в соответствующих подразделах ниже.

Некоторые виды автоматических событий имеют только базовый набор свойств события, тогда как другие могут сохранять частичное или полное состояние сущности, получаемое в результате изменения.

В BaseChangeEvent и BaseTrackingEvent имеется возможность указать, изменения каких именно свойств необходимо отслеживать, при этом событие формируется при изменении любого из отслеживаемых свойств. Для этого в спецификации события event используется специальный элемент parents-property. BaseObjectEvent, BaseTrackingEvent и BaseSnapshotEvent отслеживают не только изменения, но и создание и удаление сущности.

Могут отслеживаться свойства следующих типов:

  • примитивные типы;

  • перечисления;

  • ссылки на другой класс (отслеживается изменение самой ссылки, а не сущности по ссылке);

  • reference;

  • свойства статусной модели (сгенерированные statusFor<наблюдатель>);

  • ссылки на справочники;

  • embeddable типы.

Свойства-коллекции указывать в качестве отслеживаемых нельзя.

Особенности обработки транзакции с созданием событий отслеживания изменений#

Для того чтобы формируемые события с сохранением состояния содержали актуальное значение всех сохраняемых свойств все отслеживаемые сущности блокируются при выполнении пакета (кроме операций создания, т.к. это не имеет смысла). В качестве альтернативы предусмотрена опция, позволяющая вместо блокировки выполнять повторное чтение обновленных сущностей непосредственно перед формированием события. Данную опцию можно включить настройкой dataspace-core.events.lock-updated-entity-for-change-event — значение по умолчанию false, т.е. обновленные сущности считываются повторно. К операциям удаления всегда применяется только блокирование.

При наличии в модели данных пользовательских событий, унаследованных от имеющихся базовых типов классов событий изменения сущности, а также при изменении соответствующих сущностей при выполнении пакета команд, в результате заданные события сформируются автоматически в той же транзакции.

Внимание!

Следует учесть, что событие удаления не будет сформировано, если в результате выполнения пакета команд удаляется корень агрегата, которому принадлежит событие.

Модель данных для иллюстрации примеров работы с событиями отслеживания изменений#

Далее примеры будут базироваться на следующей модели предметной области:

<?xml version="1.0" encoding="UTF-8" ?>
<model model-name="ChangeEventsModel" version="DEV-SNAPSHOT" xmlns="DataspaceModel">
    <!-- Внешний тип -->
    <external-types>
        <external-type type="Client" merge-kind="ORGANIZATION"/>
        <external-type type="Document" />
    </external-types>
    <!-- Корень агрегата -->
    <class name="AccountGroup" label="Группа счетов клиента — агрегат">
        <reference name="groupClient" type="Client" unique="true"/>
        <property name="accounts" type="Account" collection="set" mappedBy="accountGroup"/>
        <property name="statementInfo" type="StatementInfo" collection="set" mappedBy="accountGroup"/>
    </class>
    <!-- Отслеживаемая сущность для демонстрации примеров событий -->
    <class name="Account" label="Клиентский счет">
        <!-- Родительская ссылка на корень агрегата -->
        <property name="accountGroup" type="AccountGroup" parent="true"/>
        <!-- Свойство примитивного типа -->
        <property name="accountType" type="String" length="4" />
        <!-- Свойство примитивного типа -->
        <property name="number" type="String" length="32" unique="true"/>
         <!-- Embeddable свойство -->
        <property name="balance" type="Balance" historical="true"/>
        <!-- Ссылка на дочернюю сущность -->
        <property name="statementInfo" type="StatementInfo" />
        <!-- Ссылка на справочник -->
        <property name="branch" type="Branch" />
        <!-- Референс на внешнюю сущность  -->
        <reference name="client" type="Client"/>
        <!-- Свойство примитивного типа Text-->
        <property name="description" type="Text" />
        <!-- Свойство примитивного типа Binary-->
        <property name="hash" type="Binary" />
        <!-- Свойство с типом enum, имя которого совпадает с именем служебного атрибута события  -->
        <property name="status" type="StatusEnum" mandatory="true"/>
        <!-- Референс на другой агрегат  -->
        <reference name="product" type="Product" />
        <!-- Коллекция дочерних элементов -->
        <property name="postings" type="Posting" collection="set" mappedBy="account"/>
        <!-- Коллекция примитивного типа  -->
        <property name="tags" type="String" collection="set" />
        <!-- Коллекция внешних референсов   -->
        <reference name="documents" type="Document" collection="set"/>
        <!-- <property name="statusForAccounting" type="Status"/> будет создан при генерации DataSpace на основании статусной модели -->
    </class>
    <!-- Enum -->
    <enum name="StatusEnum">
        <value name="ACTIVE"/>
        <value name="INACTIVE"/>
    </enum>
    <!-- Embeddable -->
    <class name="Balance" embeddable="true">
        <property name="value" type="Decimal" length="20" scale="2"/>
        <property name="currency" type="String" length="3"/>
    </class>
    <!-- Справочник -->
    <class name="Branch" is-dictionary="true">
        <property name="code" type="String" />
        <property name="name" type="String" />
        <property name="location" type="String"/>
    </class>
    <!-- Дочерний класс  -->
    <class name="Posting" label="Движение по счету">
        <property name="account" type="Account" parent="true"/>
        <property name="amount" type="Decimal" />
        <property name="description" type="String" length="255"/>
    </class>
    <!-- Еще один дочерний класс -->
    <class name="StatementInfo" label="Данные для выписки">
        <property name="accountGroup" type="AccountGroup" parent="true"/>
        <property name="periodicity" type="Integer" />
        <property name="title" type="String" />
        <property name="name" type="String" />
        <property name="deliveryAddress" type="Text" />
    </class>
    <!-- Статусная модель -->
    <status-classes class="Account">
        <stakeholder name="Accounting" code="accounting" />
    </status-classes>
    <statuses class="Account">
        <stakeholder-link code="accounting">
            <status code="open" description="счет открыт без ограничений" name="открыт" initial="true">
                <to status="frozen"/>
                <to status="closed"/>
            </status>
            <status code="frozen" name="заблокирован">
                <to status="open"/>
                <to status="closed"/>
            </status>
            <status code="closed" name="закрыт"/>
        </stakeholder-link>
    </statuses>
    <!-- Отдельный агрегат -->
    <class name="Product" label="Продукт" lockable="true">
        <property name="name" type="String" label="Наименование"/>
        <property name="code" type="String" label="Код"/>
        <reference name="client" type="Client" label="Идентификатор клиента"/>
    </class>

    <event name="AccountChangeEvent" extends="BaseChangeEvent">
        <property name="account" type="Account" parent="true"/>
        <parents-property name="statusForAccounting"/>
        <parents-property name="client"/>
    </event>

    <event name="AccountObjectEvent" extends="BaseObjectEvent">
        <property name="account" type="Account" parent="true"/>
    </event>

    <event name="AccountTrackingEvent" extends="BaseTrackingEvent">
        <property name="account" type="Account" parent="true"/>
        <parents-property name="statusForAccounting.code"/> <!-- Примечание: сохраняется code статуса, отслеживается изменения идентификатора статуса -->
        <parents-property name="client"/>
        <parents-property name="statementInfo"/> <!-- Примечание: сохраняется и отслеживается значение ссылки -->
        <parents-property name="statementInfo.title"/> <!-- Примечание: дополнительно сохраняется значение свойства title сущности по ссылке statementInfoChildRef (если ссылка не `null`) -->
    </event>

    <event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true"> <!-- Примечание: у события задан атрибут snapshot-large-properties="true", т.е. в данных события будут сохранены значения свойств description c типом Text и hash с типом Binary-->
        <property name="account" type="Account" parent="true"/>
        <parents-property name="status" rename="accountStatus"/> <!-- Примечание: требуется явное переименование, поскольку status пересекается с именем служебного свойства события) -->
        <parents-property name="statementInfo.title"/> <!-- Примечание: дополнительно сохраняется значение свойства title сущности по ссылке statementInfoChildRef (если ссылка не `null`) -->
    </event>

  
</model>

Элемент parents-property#

Для указания отслеживаемых свойств существует специальный элемент в модели — parents-property со следующими атрибутами:

Имя атрибута

Назначение

name

а) Имя отслеживаемого свойства в указанном родительском классе или
б) путь к свойству сущности по ссылке, при необходимости сохранять в данных события значение этого свойства на момент создания события.
В случае, если событие сохраняет состояние отслеживаемой сущности, то указанные свойства переносятся в результирующее автоматическое событие с состоянием как есть, т.е. при указании ссылки — будет полноценная ссылка, при указании embeddable — полноценное embeddable со всеми его свойствами и т.д.
Однако, если указывается путь к свойству сущности по ссылке или к свойству embeddable типа, то отслеживаемым будет значение ссылки или embeddable свойства, а сохраняться будет значение свойства по указанному пути.
Доступ к свойствам сущности по ссылке имеет смысл только для событий, сохраняющих состояние. Если значение какого-либо свойства в пути равно null, то итоговое значение будет null.

rename

Используется в событиях, сохраняющих состояние изменившейся сущности, если при сохранении необходимо переименовать родительское свойство или свойство сущности по ссылке (например, при коллизии имен).
Если переименование для свойств сущности по ссылке или embeddable не задано, то в событии будет создано свойство с тем же именем.

Различные варианты задания parents-property иллюстрирует пример ниже, в котором создается дочернее событие ParentsPropertiesSample, унаследованное от BaseTrackingEvent. Его элементы parents-property демонстрируют способы объявления свойств родительского класса.

    <event name="ParentsPropertiesSample" extends="BaseTrackingEvent">
        <property name="account" type="Account" parent="true"/>
        <!-- указание на свойство-статус statusForAccounting -->
        <parents-property name="statusForAccounting"/>
        <!-- указание на примитивное свойство number — в результирующем классе события будет создано примитивное свойство number -->
        <parents-property name="number" />
        <!-- указание на ссылку на справочник branch — в результирующем классе события будет создано свойство-ссылка на справочник Branch -->
        <parents-property name="branch" />
        <!-- указание на примитивное свойство code справочника Branch — в результирующем классе события будет создано примитивное свойство branchCode -->
        <parents-property name="branch.code" rename="branchCode" />
        <!-- указание на свойство-перечисление status, переименование необходимо, поскольку имя совпадает с именем служебного свойства события — в результирующем классе события будет создано примитивное свойство accountStatus -->
        <parents-property name="status" rename="accountStatus" />
        <!-- указание на свойство-reference product — в результирующем классе события будет создано свойство-reference -->
        <parents-property name="product" />
        <!-- указание на embeddable-свойство balance целиком — в результирующем классе события свойства будет создано такое же embeddable свойство -->
        <parents-property name="balance" />
        <!-- указание на примитивное поле value у свойства balance с типом embeddable — в результирующем классе события будет создано примитивное свойство value -->
        <parents-property name="balance.value" />
        <!-- указание на свойство-ссылку statementInfo — в результирующем классе события будет создано свойство-ссылка -->
        <parents-property name="statementInfo" />
        <!-- указание на примитивное свойство name у свойства statementInfo — в результирующем классе события будет создано примитивное свойство infoName -->
        <parents-property name="statementInfo.name" rename="infoName" />
    </event>

BaseChangeEvent#

При объявлении для сущности дочернего события, унаследованного от BaseChangeEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности из списка parents-property (и только в этом случае). Значения свойств отслеживаемой сущности в данных события не сохраняются, элементы parents-property задаются только для отслеживания изменений.

Собственные свойства BaseChangeEvent:

Свойство

Тип

Описание

sysVersion

Long

Версия сущности (ее агрегата) по результатам операции

sysTimeChanged

OffsetDateTime

Метка времени изменения, одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата

Пример объявления BaseChangeEvent события, формируемого при изменении любого из свойств statusForAccounting, product:

<event name="AccountChangeEvent" extends="BaseChangeEvent">
  <property name="account" type="Account" parent="true"/>
  <parents-property name="statusForAccounting"/>  
  <parents-property name="product"/>  
</event>

BaseObjectEvent#

При объявлении для сущности дочернего события, унаследованного от BaseObjectEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности (включая изменение коллекций примитивов и референсов), а также создании и удалении объекта. Эти события не хранят состояния отслеживаемой сущности. Для данного типа запрещено явное указание элементов parents-property, т.к. это не несет смысла.

Внимание!

Коллекции ссылок с типом связи «один-ко-многим» и mapped-by ссылки с типом связи один-к-одному не отслеживаются ни одним из типов событий.

Внимание!

При использовании данного вида события следует учитывать особенности отслеживания изменений сущностей при выполнении пакета команд Packet (см. раздел «Особенности отслеживания изменений сущностей при выполнении пакета команд Packet» документа «Руководство прикладного разработчика».

В модели данных может быть только одно событие BaseObjectEvent для каждого пользовательского типа.

Собственные свойства BaseObjectEvent:

Свойство

Тип

Описание

sysVersion

Long

Версия сущности (ее агрегата) по результатам операции

sysTimeChanged

OffsetDateTime

Метка времени изменения. Одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата

sysObjectEvent

enum SysObjectEvent

Тип действия, совершенного над отслеживаемой сущностью: C— create, U— update, D— delete

Пример объявления BaseObjectEvent события, формируемого при изменении любого из свойств родительского класса Account, кроме postings, при создании и удалении объектов типа Account.

<event name="AccountObjectEvent" extends="BaseObjectEvent">
  <property name="account" type="Account" parent="true"/>
</event>

BaseTrackingEvent#

При объявлении для сущности дочернего события, унаследованного от BaseTrackingEvent, событие указанного типа будет формироваться при изменении любого из перечисленных в parents-property свойств сущности, а также создании и удалении объекта. Значения указанных при помощи элементов parents-property свойств родительской сущности по состоянию на момент завершения операции сохраняются в копии этих свойств в данных события. При удалении сущности сохраняются значения, которые эти свойства имели непосредственно перед удалением. Доступна возможность сохранения свойств сущностей по ссылке, а также переименования сохраняемых свойств.

Внимание!

Элементы parents-property типов BaseTrackingEvent не могут указывать на коллекционные свойства.

Собственные свойства BaseTrackingEvent:

Свойство

Тип

Описание

sysVersion

Long

Версия сущности (ее агрегата) по результатам операции

sysTimeChanged

OffsetDateTime

Метка времени изменения. Одинакова для всех событий в транзакции и совпадает с меткой времени изменения агрегата

sysObjectEvent

enum SysObjectEvent

Тип действия, совершенного над отслеживаемой сущностью: C— create, U— update, D— delete

sysChangeUser

String

Информация о пользователе, изменившем данные, используется значение из HTTP-заголовка (конфигурирование способа получения информации описано в разделе «Способы получения информации о пользователе» документа «Руководство по системному администрированию»)

При изменении атрибутного состава родительского класса (например, в родительском классе добавили или удалили атрибут, или изменили тип, или изменился признак deprecated) класс события также обновится соответствующим образом.

Пример объявления BaseTrackingEvent события, формируемого при изменении любого из свойств statusForAccounting, description, statementInfo, а также при создании и удалении объектов:

<event name="AccountTrackingEvent" extends="BaseTrackingEvent">
    <property name="account" type="Account" parent="true"/>
    <!-- Сохраняется code статуса, отслеживается изменения идентификатора статуса -->
    <parents-property name="statusForAccounting.code"/>
    <parents-property name="description"/>
    <!-- Сохраняется и отслеживается значение ссылки -->
    <parents-property name="statementInfo"/>
    <!-- Дополнительно сохраняется значение свойства сущности по ссылке (если ссылка не `null`) -->
    <parents-property name="statementInfo.title"/> 
</event>

BaseSnapshotEvent#

При объявлении для сущности дочернего события, унаследованного от BaseSnapshotEvent, событие указанного типа будет формироваться при изменении любого из свойств сущности (включая изменение коллекций примитивов и референсов), а также создании и удалении объекта. Это событие сохраняет в своих данных состояние всех свойств по состоянию на момент завершения операции, за исключением коллекций и свойств с типами Text и Binary (то есть в классе события будут созданы соответствующие свойства). При удалении сущности сохраняются значения, которые эти свойства имели непосредственно перед удалением. Помимо этого, через элементы parents-property доступна возможность сохранения свойств сущностей по ссылке, а также переименования сохраняемых свойств. Если необходимо сохранять и свойства с типами Text и Binary, то следует явно указать имена нужных свойств при помощи элемента parents-property. Также имеется возможность сохранять значения всех свойств с типами Text и Binary, задав на элементе event события атрибут snapshot-large-properties="true".

Внимание!

Элементы parents-property типов BaseSnapshotEvent не могут указывать на коллекционные свойства.

Внимание!

Следует учесть, что данные свойств с типами Text и Binary могут занимать значительное место. Поэтому в событии следует сохранять только то, что действительно необходимо фиксировать на момент транзакции (что нельзя, например, получить при помощи GraphQL-query при публикации).

Внимание!

При использовании данного вида события следует учитывать особенности отслеживания изменений сущностей при выполнении пакета команд Packet (см. раздел «Особенности отслеживания изменений сущностей при выполнении пакета команд Packet» документа «Руководство прикладного разработчика».

Состав собственных атрибутов BaseSnapshotEvent совпадает с BaseTrackingEvent.

При изменении атрибутного состава родительского класса (например, в родительском классе добавили или удалили атрибут, или изменили тип, или изменился признак deprecated) класс события также соответствующим образом изменится.

Пример объявления BaseSnapshotEvent события, формируемого при изменении любого из свойств parent класса, кроме postings, а также при создании и удалении объектов:

<event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true"> <!-- Примечание: включен snapshot-large-properties, т.е. сохраняются и свойства `someText`, `someBinary` -->
    <property name="account" type="Account" parent="true"/>
    <!-- Дополнительно сохраняется значение свойства сущности по ссылке (если ссылка не `null`) -->
    <parents-property name="statementInfo.title"/>
    <!-- Необходимо явное переименование, поскольку status пересекается с именем служебного свойства события) -->
    <parents-property name="status" rename="accountStatus"/>
</event>

Спецификация подписок#

Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml. При сборке проекта файл попадает в конфигурацию DataSpace Subscription. При старте сервис DataSpace Subscription читает этот файл и загружает подписки в таблицу T_DSPC_SYS_SUBSCRIPTION.

Пример:

<subscriptions xmlns="DataspaceSubscriptions">
  <subscription id="mergeSubscription1" 
                name="EPKMergeForProduct" 
                description="Публикация события с слиянии во внешнюю систему" 
                target="REST" 
                eventType="ProductClientMergeEvent" 
                callback="${my.merge.subscription.1.url}/api/v1/products" 
                validTill="20240101T00:00:00Z" 
                maxRetryAttempts="${my.merge.subscription.1.nretry}" 
                timeoutMs="${my.merge.subscription.1.timeout}" 
                retryDelayMs="${my.merge.subscription.1.retrydelay}" 
                async="false" 
                blocking="true"
                idempotenceHeaderName="requestUID">
    <criteria>
    root.referenceName=='client' &amp;&amp; coalesce(root.mergeFailed, false) != true
    </criteria>
    <query>
        {
        searchProduct(cond: "it.$id=='${product}'") {
            elems {
                code
                name
                services {
                    elems {
                        code
                        name
                    }
                }
                oneToOneLink {
                    name
                }
            }
          }
        }
    </query>
    <template>
        [
        {
            "operation": "shift",
            "spec": {
                "data": {
                    "searchProduct": {
                        "elems": {
                            "*": {
                                "code": "ProductCodeMappedTo",
                                "oneToOneLink": {
                                    "name": "oneToOneLinkNameMappedTo"
                                },
                                "services": {
                                    "elems": {
                                        "*": {
                                            "code": "servicesMappedTo"
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        ]
    </template>
    <headers>
      -XTenantId=${my.merge.subscription.1.tenantId}
    </headers>
  </subscription>
</subscriptions>

Описание полей:

  • id — уникальный идентификатор подписки, используется в качестве primary key в таблице подписок T_DSPC_SYS_SUBSCRIPTION.

    Внимание!

    Значение «0» зарезервировано системой.

  • name — наименование подписки.

  • description — текстовое описание подписки.

  • target — способ публикации (REST, KAFKA, GRAPHQLSUBS).

  • eventType — тип событий (entity name), на которые осуществляется подписка.

  • callback:

    • REST API: URL для публикации сообщения. Можно указать HTTP-метод (POST, GET, PUT, PATCH, DELETE). HTTP-метод (если он указан) должен быть первым элементом строки, указанной в поле callback. Разрешается любой регистр и любое количество пробелов до и после. Если HTTP-метод не указан, то по умолчанию будет использован POST. Параметры URL могут содержать placeholders формата ${<наименование_свойства_события>}. Перед выполнением вызова placeholders будут заменены на соответствующие значения свойств события. Пример: callback="GET localhost:8080/api/v1/products/touch?${fromClient}" .

    • Kafka: <наименование_кластера>:<наименование_topic>. Пример: callback="PPK_KAFKA:topicName".

  • validTill — время окончания действия подписки.

  • maxRetryAttempts — количество повторных попыток публикации (актуально только для target == REST).

  • timeoutMs — тайм-аут публикации, в миллисекундах (актуально только для target == REST).

  • retryDelayMs — задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST).

  • async — использование асинхронной отправки (true/false).

  • blocking — приостановление отправки при первой ошибке (true/false).

  • idempotenceHeaderName — наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности. При всех попытках отправки одного и того же сообщения заголовок принимает одинаковое значение. Если атрибут не задан или значение пустое — заголовок не формируется.

  • criteria — строковое выражение для фильтрации записей события. С описанием формата строковых выражений можно ознакомиться в документе «Строковые выражения».

  • query — текст GraphQL-запроса для чтения дополнительных данных. С GraphQL-API можно ознакомиться в документе «Протокол GraphQL». Текст запроса может содержать placeholder формата ${<наименование_свойства_события>}. Перед выполнением GraphQL-запроса, placeholder будут заменены на соответствующие значения свойств события. <наименование_свойства_события> — наименование свойства JPA-класса события entity, name которого прописан в поле eventType у конкретного subscription в subscriptions.xml.

  • template — спецификация трансформации JSON-данных в JSON-сообщение. Для трансформации по умолчанию используется библиотека JOLT, поэтому для описания преобразования в параметре template используется формат спецификации этой библиотеки. Исходные данные для трансформации по шаблону имеют общий вид:

    {
      "event": {
        <атрибуты события>
      },
      "data": {
        <результат выполнения graphql-запроса. Если graphql-запрос в подписке не задан или не вернул данных  элемент пустой>
      }
    }
    
  • headers — спецификация дополнительных заголовков сообщения. Формат: <наименование_заголовка>=<значение>. <значение> может содержать Placeholder формата ${<наименование_свойства_события>}, которые будут заменены аналогично логике, описанной для поля query.

Примечание

На старте сервиса DataSpace Subscription проводится валидация полей criteria,query, template. Если валидация не проходит, сервис завершает свою работу.

Примечание

В качестве placeholder могут использоваться следующие типы свойств события:

  • Примитивы, enum ${<наименование_свойства_события>};

  • Ссылки ${<наименование_ссылки_события>} (подставляется значение id);

  • Reference ${<наименование_reference_события>} (подставляется значение entityId);

  • Embeddable ${<наименование_embeddable_события>.<наименование_свойства_embeddable>} (подставляется значение свойства embeddable).

Внимание!

Поля criteria,query, template и headers не могут содержать пустое значение. Если не требуется задавать значение для данных полей, то не нужно указывать соответствующие теги.

Стендозависимая конфигурация subscriptions.xml#

Потребитель может использовать в значениях полей target, callback, template, headers, maxRetryAttempts, timeoutMs, retryDelayMs Placeholder, которые заполнятся при загрузке модуля из конфигурации приложения. Конфигурация приложения может быть параметризована и сделана стендозависимой.

Для этого необходимо добавить в файл customer-properties-subscription.yaml список параметров со значениями. В качестве значения может быть использован также параметр развертывания, например, {{tenantId}}. Эти параметры должны быть перечислены также в файле custom_property.conf.yml, могут быть как со значениями по умолчанию, так и без значений. При установке на стенд этим параметрам должны быть заданы значения в стендозависимой конфигурации.

Подробнее см. в документе «Руководство по установке» в следующих разделах:

  • «Добавление пользовательских параметров в дистрибутив».

  • «Использование пользовательских параметров в рамках сервиса DataSpace».

Пример customer-properties-subscription.yaml:

kind: ConfigMap
apiVersion: v1
metadata:
  name: customer-properties
data:
  customer.properties: |-
    my.merge.subscription.1.url={{ customer.value1 }}
    my.merge.subscription.1.nretry=1
    my.merge.subscription.1.timeout={{ customer.value2 }}
    my.merge.subscription.1.retrydelay=2000
    my.merge.subscription.1.tenantId={{ customer.value3 }}

Пример custom_property.conf.yml:

customer.value1=http://localhost:8082
customer.value2=10000
customer.value3=tenantId
# ...

Гарантия последовательности публикации событий

Для гарантии последовательной отправки событий по агрегату необходимо для подписки:

  • выключить async-режим: async="false";

  • включить blocking-режим: blocking="true".

Комбинацию async="true" blocking="true" использовать не следует, т.к. последовательность обработки в этом случае не гарантируется.

Процессы и масштабирование#

Основную логику сервиса DataSpace Subscription реализуют процессы EVENT_TRANSFER и EVENT_WORKER.

Задача процесса EVENT_TRANSFER состоит в том, чтобы для каждого события, по которому есть валидные подписки, сформировать записи в таблице очереди на публикацию T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего событие считается обработанным. При этом для всех выбранных новых event (для которых нашлись валидные подписки и были созданы записи в T_DSPC_SYS_EVENT_QUEUE_ITEM, а также для которых подписок не нашлось) статус меняется на PROCESSED.

Процесс EVENT_WORKER обрабатывает таблицу очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM: читает соответствующее событие с применением условия criteria подписки, выполняет запрос дополнительных данных, используя query, трансформирует данные в итоговое сообщение по шаблону template и осуществляет публикацию сообщения на основании target и callback.

Предусмотрена возможность масштабирования процессов EVENT_TRANSFER и EVENT_WORKER. За количество процессов, которые будут запущены, отвечают настройки dataspace.subscription.event-transfer.transfer-processes-count и dataspace.subscription.event-worker.worker-processes-count соответственно. Реализована возможность распределения процессов по партициям для того, чтобы процессы обрабатывали каждый свой набор данных. За количество партиций для процессов EVENT_TRANSFERи EVENT_WORKER отвечают настройки dataspace.subscription.event-transfer.partitions-number и dataspace.subscription.event-worker.partitions-number соответственно. Партиции ведутся в разрезе подписок. Для процессов EVENT_TRANSFER зарезервирован идентификатор подписки со значением «0».

Внимание!

Настройки количества партиций для процессов EVENT_TRANSFER и EVENT_WORKER должны иметь одинаковые значение для всех экземпляров сервиса DataSpace Subscription, работающих с одной БД. При рассогласовании значений данных настроек между разными экземплярами процессы будут брать в работу один и тот же набор данных, что приведет к нарушению порядка обработки событий.

Также запрещено менять данные настройки в runtime.

Служебная таблица T_DSPC_SYS_SUBSCRIPTION_WORKER содержит информацию о партициях в разрезе подписок. Недостающие партиции добавляет служебный процесс на старте сервиса. Процессы EVENT_TRANSFER и EVENT_WORKER в процессе своей работы проверяют таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER на предмет необрабатываемых партиций и берут их в работу, прописывая свой идентификатор в поле PROCESSID. Партиция считается необрабатываемой, если PROCESSID не установлен, либо ее heartbeat не обновлялся определенное время, которое задается в настройках dataspace.subscription.event-transfer.heartbeat-timeout-sec и dataspace.subscription.event-worker.heartbeat-timeout-sec для процессов EVENT_TRANSFER и EVENT_WORKER соответственно.

Набор данных разбивается по партициям при помощи предварительно рассчитанного хеша, который хранится в поле PARTITIONHASH в таблицах событий и очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM. При обработке записей процессы EVENT_TRANSFER и EVENT_WORKER вычисляют партицию, к которой принадлежит запись по формуле PARTITIONHASH mod <количество_партиций>. Если полученная партиция присутствует среди партиций, назначенных процессу, то он выполняет обработку записи.

Таблица T_DSPC_SYS_SUBSCRIPTION_PROCESS также способствует управлению масштабированием процессов. При старте сервиса процессы регистрируются в данной таблице, записывая свой идентификатор в поле OBJECT_ID. В процессе работы процессы пересчитывают значение поля MAXPARTITIONS, которое содержит максимальное количество партиций, обрабатываемых процессом, по следующей формуле: <количество_подписок> * <количество_партиций> / <количество_процессов>. Если процессу назначено больше партиций, чем MAXPARTITIONS, то он освободит нужное количество.

Поскольку EVENT_WORKER может обрабатывать последовательно партиции разных подписок, то при сбое одного из получателей может возникнуть задержка обработки для других подписок. Для того чтобы работоспособные получатели продолжали получать сообщения с неизменной скоростью, реализован специальный тип процессов — ERROR_WORKER.

Алгоритм действия сервиса при возникновении ошибки по одной из партиций подписки:

  1. Все партиции подписки, при обработке которой произошла ошибка, EVENT_WORKER переключает в состояние state = ERROR в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER и перестает их обрабатывать, установив значение processId = null.

  2. Процесс ERROR_WORKER выбирает для обработки партиции с состоянием ERROR или CIRCUIT_BREAKING, устанавливая свой processId для соответствующих партиций.

  3. Процесс ERROR_WORKER обрабатывает полученные партиции по алгоритму, аналогичному EVENT_WORKER.

  4. Если очередное событие было успешно обработано, то ERROR_WORKER должен переключить партицию в состояние state = UNASSIGNED и установить processId = null для того, чтобы ее снова подхватил EVENT_WORKER.

При этом за время восстановления в очереди на отправку могло накопиться значительное число событий, что также может повлиять на скорость отправки по тем подпискам, по которым ошибки отсутствуют. Поэтому перед переключением партиции ERROR_WORKER анализирует размер очереди по партиции и, если он выше порогового значения (dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold), то ERROR_WORKER продолжает обрабатывать партицию, пока размер очереди не станет ниже порогового значения.

Для процессов ERROR_WORKER также предусмотрена возможность масштабирования:

  • За количество процессов, которые будут запущены, отвечает настройка dataspace.subscription.event-worker.error-worker-processes-count.

  • Количество партиций для процессов ERROR_WORKER равно количеству партиций, установленному для процессов EVENT_WORKER (dataspace.subscription.event-worker.partitions-number).

Интеграция с GraphQL Subscriptions#

Сервис DataSpace Subscription обеспечивает реализацию функциональности GraphQL Subscriptions для сервиса DataSpace Core.

Для настройки данной интеграции должны быть сконфигурированы следующие параметры сервиса DataSpace Subscription:

  • dataspace.graphql.subscriptions.enabled — включает функциональность, обслуживающую работу GraphQL Subscription для сервиса DataSpace Core (по умолчанию равен «true» — включено);

  • dataspace.subscription.grpcServerPort — задает порт, на котором будет запущен gRPC-сервер для обслуживания запросов GraphQL Subscription (по умолчанию равен «9001»);

  • dataspace.subscription.event-worker.gql-worker-processes-count — количество процессов GQL_EVENT_WORKER, обрабатывающих события (по умолчанию равен «1»).

Служебные таблицы#

T_DSPC_SYS_SUBSCRIPTION — таблица подписок. Данные загружаются из файла subscriptions.xml при старте сервиса.

Содержит следующие поля:

Поле

Описание

OBJECT_ID

Идентификатор подписки, задает пользователь (значение «0» зарезервировано системой)

NAME

Имя подписки (для отображения, включая labels мониторинга)

DESCRIPTION

Описание подписки

EVENTTYPE

Entity name событий, на которые осуществляется подписка

CRITERIA

Строковое выражение для фильтрации записей события

QUERY

Текст GraphQL-запроса для чтения дополнительных данных

TEMPLATE

Спецификация трансформации JSON-данных в JSON-сообщение

TARGET

Способ публикации («REST», «KAFKA», «GRAPHQLSUBS»)

CALLBACK

URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka

HEADERS

Спецификация дополнительных заголовков сообщения

IDEMPOTENCEHEADERNAME

Наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности

ASYNC

Признак использования асинхронной отправки («true»/»false»)

BLOCKING

Признак приостановки отправки при первой ошибке («true»/»false»)

MAXRETRYATTEMPTS

Количество повторных попыток публикации (актуально только для TARGET == REST)

RETRYDELAYMS

Задержка между повторными попытками публикации, в миллисекундах (актуально только для TARGET == REST)

TIMEOUTMS

Тайм-аут публикации, в миллисекундах (актуально только для TARGET == REST)

VALIDTILL

Время окончания действия подписки

SYS_OWNERID

Владелец подписки (ID тенанта)

SYS_LASTCHANGEDATE

Дата последнего изменения записи

T_DSPC_SYS_EVENT_QUEUE_ITEM — таблица очереди обработки событий. Заполняется процессами EVENT_TRANSFER. Обрабатывается процессами EVENT_WORKER.

Содержит следующие поля:

Поле

Описание

OBJECTID_EVENTID

Ссылка на событие (часть первичного ключа)

OBJECTID_SUBSCRIPTIONID

Ссылка на подписку (часть первичного ключа)

EVENTTYPE

Entity name события

STATUS

Статус обработки элемента очереди событий («NEW», «SENT», «ERROR», «SKIP»)

AGGREGATEROOTTYPE

Тип корня агрегата event, определяется по метаданным

AGGREGATEROOTID

Идентификатор корня агрегата события

PARTITIONHASH

Предварительно рассчитанный хеш для деления по партициям

IDEMPOTENCEHEADER

UUID-значение, формируемое при создании публикации и передаваемое в специальном заголовке при публикации сообщения

CREATIONTIMESTAMP

Время добавления события в очередь

SYS_LASTCHANGEDATE

Дата последнего изменения записи

Значения поля STATUS:

  • NEW — новый элемент очереди событий, еще не был обработан;

  • SENT — элемент очереди событий успешно обработан, сообщение отправлено;

  • ERROR — при обработке произошла ошибка, элемент будет обработан повторно;

  • SKIP — подходящее событие не найдено в процессе обработки элемента очереди событий (скорее всего событие не прошло по условию, указанному в поле criteria подписки).

T_DSPC_SYS_SUBSCRIPTION_WORKER

Содержит следующие поля:

Поле

Описание

OBJECTID_PARTITIONNUMBER

Номер партиции (часть первичного ключа)

OBJECTID_SUBSCRIPTIONID

Ссылка на подписку (часть первичного ключа)

HEARTBEAT

Время последнего обновления

PROCESSID

Идентификатор процесса

STATE

Состояние обработки партиции («ACTIVE», «CIRCUIT_BREAKING», «UNASSIGNED», «ERROR»)

T_DSPC_SYS_SUBSCRIPTION_PROCESS

Содержит следующие поля:

Поле

Описание

OBJECT_ID

Идентификатор процесса (первичный ключ)

MAXPARTITIONS

Максимальное количество партиций, обрабатываемых процессом

KIND

Вид процесса (worker/перекладчик) («EVENT_WORKER», «EVENT_TRANSFER»)

HEARTBEAT

Время последнего обновления

Конфигурация#

dataspace.subscription.idempotenceHeaderUuidWithHyphens
#Значение по умолчанию: "true"
#true — UUID для заголовка идемпотентности idempotenceHeaderName формируется с дефисами, в результате его размер равен 36 символам
#false — UUID для заголовка идемпотентности idempotenceHeaderName формируется без дефисов, в результате его размер равен 32 символам

dataspace.subscription.event-transfer.transfer-processes-count
#Значение по умолчанию: 1
#Количество процессов перекладчика (EVENT_TRANSFER)

dataspace.subscription.event-transfer.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы перекладчика

dataspace.subscription.event-transfer.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов перекладчика (EVENT_TRANSFER)

dataspace.subscription.event-transfer.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс перекладчика

dataspace.subscription.event-transfer.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс перекладчика будет считаться неактивным

dataspace.subscription.event-transfer.max-transfer-count
#Значение по умолчанию: 1000
#Количество событий, обрабатываемых перекладчиком за один цикл работы

dataspace.subscription.event-worker.worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER

dataspace.subscription.event-worker.error-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER, обрабатывающего ошибки (ERROR_WORKER)

dataspace.subscription.event-worker.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы процесса EVENT_WORKER

dataspace.subscription.event-worker.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов EVENT_WORKER

dataspace.subscription.event-worker.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс EVENT_WORKER

dataspace.subscription.event-worker.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс EVENT_WORKER будет считаться неактивным

dataspace.subscription.event-worker.circuit-breaker-error-count-threshold
#Значение по умолчанию: 10
#Количество ошибок по подписке, по достижении которого включается circuit breaker на эту подписку

dataspace.subscription.event-worker.circuit-breaker-timeout-ms
#Значение по умолчанию: 30000 миллисекунд
#Время с момента включения circuit breaker на подписку, по истечении которого circuit breaker будет выключен

dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold
#Значение по умолчанию: 100
#Пороговое значение размера очереди накопившихся сообщений по подписке
#Пока размер очереди выше указанного значения порога, `ERROR_WORKER` не будет переключать партицию обратно на `EVENT_WORKER`

dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker
#Значение по умолчанию: 5
#Максимальное количество потоков (на процесс) в пуле потоков, используемом для асинхронной отправки
#Результирующее максимальное количество потоков равно:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)

dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker
#Значение по умолчанию: 1000
#Максимальное количество элементов в очереди (на процесс) пула потоков, используемом для асинхронной отправки
#Результирующий размер очереди равен:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)

dataspace.subscription.event-worker.gql-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов GQL_EVENT_WORKER, обрабатывающих события для работы функциональности GraphQL Subscription сервиса DataSpace Core
#Процессы типа GQL_EVENT_WORKER не отображаются в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS

dataspace.subscription.service-tasks.pool-size
#Значение по умолчанию: 5
#Количество потоков в пуле служебных процессов

dataspace.subscription.service-tasks.delete-dead-processes-schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который удаляет неактивные процессы EVENT_TRANSFER и
#EVENT_WORKER из таблицы T_DSPC_SYS_SUBSCRIPTION_PROCESS

dataspace.subscription.service-tasks.load-subscriptions-schedule-delay-ms
#Значение по умолчанию: 1000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который выполняет загрузку подписок из файла subscriptions.xml
#в таблицу T_DSPC_SYS_SUBSCRIPTION, а также вставку недостающих партиций в таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER
#После успешной загрузки процесс перестает запускаться

dataspace.subscription.service-tasks.cleanup-depth-hr
#Значение по умолчанию: 48 часов
#Максимальное время хранения записей в таблицах событий и таблице очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего
#записи будут удалены фоновым процессом, если они уже были обработаны: status == PROCESSED для событий 
#и status == SENT для записей T_DSPC_SYS_EVENT_QUEUE_ITEM

dataspace.subscription.service-tasks.cleanup-cron
#Значение по умолчанию: 0 0 0 * * * (каждый день в 00:00)
#Расписание запуска служебного процесса, который удаляет отработанные старые записи из таблиц событий и
#таблицы очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM

dataspace.subscription.service-tasks.tasks-schedule-initial-delay-ms
#Значение по умолчанию: 90000 миллисекунд
#Задержка перед запуском всех фоновых процессов на старте сервиса. Применяется для обеспечения корректной установки health-метрики
#readiness на старте сервиса.

dataspace.subscription.xml.enable-check-on-start
#Значение по умолчанию: true
#Включает/выключает валидацию полей query, criteria, template у подписок, описанных в subscription.xml при старте сервиса
#Только для стендов разработки! При эксплуатации в промышленных условиях должна быть установлена в значение "true"

dataspace.graphql.subscriptions.enabled
#Значение по умолчанию: true
#Включает/выключает функциональность, обслуживающую работу GraphQL Subscription для сервиса DataSpace Core.

dataspace.subscription.grpcServerPort
#Значение по умолчанию: 9001
#Порт, на котором будет запущен gRPC-сервер для обслуживания запросов GraphQL Subscription

Конфигурация в рамках ручной установки сервиса#

В случае использовании Helm-инсталляции в рамках раздела «Ручная установка сервиса» документа «Руководство по установке» необходимо перед инсталляцией задать параметры в values.yaml:

appConfig:
  overrideProperties:
    dataspace:
        subscription:
            publisher:
                kafka:
                    config:
                        <наименование_кластера>:
                            acks: ''
                            bootstrap-servers: ''
                            delivery-timeout-ms: ''
                            enable-idempotence: ''
                            key-serializer: org.apache.kafka.common.serialization.StringSerializer
                            linger-ms: ''
                            max-in-flight-requests-per-connection: ''
                            request-timeout-ms: ''
                            retry-backoff-ms: ''
                            # Параметры SSL подключения
                            security-protocol: SSL
                            ssl-endpoint-identification-algorithm: ''
                            ssl-keystore-location: ''
                            ssl-truststore-location: ''
                            value-serializer: org.apache.kafka.common.serialization.StringSerializer

В среде контейнеризации создать секрет Kind: Secret, указав его имя в параметре в values.yaml:

   appConfig:
    subscription:
       config: "dspc-subscription-ssl"
apiVersion: v1
kind: Secret
metadata:
  name: dspc-subscription-ssl
data:
  subscription.properties: |-
    dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-password=
    dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-password=
    dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-key-password=

Конфигурация в рамках автоматизированной установки сервиса#

В случае использовании автоматизированной установки в рамках раздела «Автоматизированная сборка и установка сервиса» документа «Руководство по установке», если предполагается использование функциональности публикации событий в Kafka, необходимо задать настройки для каждого кластера Kafka в файле customer-properties-subscription.yaml согласно разделу «Добавление пользовательских параметров в дистрибутив» в следующем виде (пример):

kind: ConfigMap
apiVersion: v1
metadata:
  name: customer-properties
data:
  customer.properties: |-
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.bootstrap-servers={{ local.kafka.bootstrap.servers }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.acks={{ local.kafka.acks }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.delivery-timeout-ms={{ local.kafka.delivery.timeout.ms }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.request-timeout-ms=200
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.retry-backoff-ms=500
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.linger-ms={{ local.kafka.linger.ms }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.enable-idempotence={{ local.kafka.enable.idempotence }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.max-in-flight-requests-per-connection={{ local.kafka.max.in.flight.requests.per.connection }}

    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.security-protocol=SSL
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-location=/deployments/credentials/kafkaSslKeystore
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-location=/deployments/credentials/kafkaSslTrustwtore
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-endpoint-identification-algorithm=

Пример Kind: Secret:

apiVersion: v1
kind: Secret
metadata:
  name: dspc-subscription-ssl
data:
  subscription.properties: |-
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-password=12456
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-password=12456
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-key-password=12456

Пример custom_property.conf.yml:

local.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
local.kafka.acks=all
local.kafka.delivery.timeout.ms=5000
local.kafka.linger.ms=0
local.kafka.enable.idempotence=true
local.kafka.max.in.flight.requests.per.connection=5
# ...

При необходимости параметры можно сделать стендозависимыми по аналогии со стендозависимой конфигурацией subscriptions.xml (см. раздел «Стендозависимая конфигурация subscriptions.xml»).

Внимание!

При использовании SSL установка значения «true» для параметра dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotence может приводить к ошибке org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. Для того чтобы этого избежать, необходимо корректным образом настроить операцию IdempotentWrite для кластера. Подробнее см. в официальной документации продукта Kafka.

В файле subscriptions.xml target необходимо задавать в формате <наименование_кластера>:<наименование_topic>.

Пример: LOCAL_KAFKA:test-topic.

Retry#

Параметры retry/timeout для Kafka задаются в настройках Kafka на конкретный кластер <наименование_кластера>. Для этого необходимо использовать параметры delivery-timeout-ms, request-timeout-ms, retry-backoff-ms.

Параметры retry/timeout для REST берутся из параметров подписки: maxRetryAttempts, timeoutMs, retryDelayMs. Для ошибок с http status code 4xx retry не выполняется (то есть retry — только для 5xx и в случае тайм-аута).

Для идемпотентности потребитель может определить необходимый заголовок (передавая какое-либо из полей event с уникальным значением).

Circuit breaking#

На уровне параметра подписки blocking вводится настройка режима обработки ошибки:

  • blocking=true — при первой ошибке запрет на отправку остальных публикаций по партиции подписки.

  • blocking=false — разрешена публикация других событий по подписке до наступления условия приостановки.

Поведение при ошибках:

  1. Производится фиксация ошибки. Статус «ERROR» устанавливается для элемента очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM.

  2. Если параметр blocking=false, то продолжается обработка следующих сообщений, при этом счетчик ошибок по подписке увеличивается.

  3. Если параметр blocking=true, то полностью приостанавливается отправка по этой партиции, при этом счетчик ошибок по подписке увеличивается.

  4. При накоплении порогового значения ошибок по подписке (настройка dataspace.subscription.event-worker.circuit-breaker-error-count-threshold равна, например, «10») приостанавливается отправка по этой подписке. При этом включается circuit breaker на всю подписку на один экземпляр приложения.

  5. Спустя время, указанное в настройке dataspace.subscription.event-worker.circuit-breaker-timeout-ms, производится попытка повторной отправки, начиная с тех, чья отправка завершилась с ошибкой.

  6. Успешная отправка сбрасывает счетчик ошибок.

Примечание

Каждый процесс EVENT_WORKER самостоятельно принимает решение о приостановке и возобновлении обработки партиции. Для того чтобы не останавливать весь поток при ошибке в данных одной из публикаций приостановится только одна партиция.

Мониторинг#

Мониторинг включается настройкой:

dataspace.subscription.metrics.enabled
#"false" — метрики выключены (значение по умолчанию)
#"true" — метрики включены

Реализованы следующие метрики мониторинга:

Наименование метрики

Тип метрики

Теги

Описание

dspc.subscription.handle.time

Timer

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Время обработки событий в разрезе подписки и результата отправки

dspc.subscription.publication.time

Timer

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Время отправки сообщений в разрезе подписки и результата отправки

dspc.subscription.errors.count

Counter

subscriptionId — идентификатор подписки

Количество ошибок отправки событий в разрезе подписки

dspc.subscription.skip.count

Counter

subscriptionId — идентификатор подписки

Количество отсеянных событий в разрезе подписки

dspc.subscription.transfer.count

Counter

Скорость перекладки событий

dspc.subscription.partition.state

Gauge

subscriptionId — идентификатор подписки, state — значение поля state T_DSPC_SYS_SUBSCRIPTION_WORKER на момент сбора метрики

Статус обработки партиций — количество по T_DSPC_SYS_SUBSCRIPTION_WORKER в разрезе state и subscriptionId

dspc.subscription.event.queue.size

Gauge

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Количество элементов очереди обработки событий в таблице T_DSPC_SYS_EVENT_QUEUE_ITEM в разрезе подписок и статусов.

dspc.subscription.new.events.count

Gauge

eventType — тип события

Отображает количество событий со статусом NEW каждого типа.

Метрики dspc.subscription.partition.state, dspc.subscription.event.queue.size и dspc.subscription.new.events.count обновляются фоновым scheduled-процессом. Частота обновления данных метрик регулируется настройкой:

dataspace.subscription.metrics.schedule-delay-ms
#Значение по умолчанию — 10000 миллисекунд

API повторной отправки событий#

REST API POST /admin/api/subscriptions/<id>/resend позволяет повторно отправить сообщения для событий, которые были уже обработаны ранее.

Внимание!

Переотправка в topic Kafka допустима только при единственной системе, читающей topic, по инициативе этой системы. Все остальные читатели могут получить ранее обработанные ими данные, в результате чего могут образоваться дубли.

Примечание

Переотправка возможна только для тех событий, по которым присутствуют записи в таблицах событий и очереди публикации, то есть они еще не были очищены. Необходимо задавать глубину очистки очередей событий с учетом требований по их возможной переотправке.

Примечание

При наличии в подписке GraphQL-query при повторной отправке он будет выполнен повторно на текущих данных. Как следствие — данные публикации могут отличаться от предыдущего вызова. Если для каких-то данных это критично, то необходимо завести соответствующие атрибуты в самом событии и заполнять их при создании события.

Примечание

Переотправка невозможна для GraphQL-подписок.

Параметры:

  • id — идентификатор подписки;

  • тело запроса формата json:

    {
        "filter" : "root.fromReference=='2' && root.referenceName=='client'",
        "timeRange" :
           {
             "from" : "2023-11-29T10:39:46.587",
             "to" : "2023-12-01T10:39:46.587"
           }
    }
    
  • filter — фильтр по атрибутам события, написанный на языке строковых выражений;

  • timeRange.from, timeRange.to — временной диапазон (учитывается поле creationTimestamp сущности события) для выбора событий, сообщения по которым будут отправлены повторно.

Требования к запросу:

  • Один из параметров filter или timeRange обязательно должен быть заполнен, иначе API вернет ошибку 405 Method Not Allowed с сообщением «Не задано ни одного условия отбора событий».

  • Если задан параметр timeRange, то to и from обязательны к заполнению. Если to или from не заполнены, то API вернет ошибку 400 Bad Request с сообщением «Поле timeRange#from обязательно для заполнения» или «Поле timeRange#to обязательно для заполнения».

При успешном вызове API возвращает код 200 OK и количество событий, по которым были повторно отправлены сообщения. Если не было найдено не одного события, подходящего под условие выборки, сформированного на основе переданных параметров, то API вернет ошибку 404 Not Found c сообщением «Не найдено подходящих событий для переотправки».

Пример вызова (на основе subscriptions.xml, описанного выше):

curl -d @data.json -H "Content-Type: application/json" -X POST http://localhost:8090/admin/api/subscriptions/mergeSubscription1/resend

data.json:

{
    "filter" : "root.$id $in ['7294634218387406849','7294634226977341441'] && root.fromReference=='2' && root.referenceName=='client'",
    "timeRange" :
       {
         "from" : "2023-11-29T10:39:46.587",
         "to" : "2023-12-01T10:39:46.587"
       }
}

Пример ответа:

5

Внимание!

Если параметры подписки в subscriptions.xml были изменены после первичной публикации события, то при повторной отправке:

  • сообщение будет направлено на новый url/topic, если были изменены поля target и/или callback;

  • формат сообщения изменится, если было изменено поле template;

  • данные сообщения изменятся, если было изменено поле query;

  • сообщение не будет отправлено, если было изменено поле criteria и событие теперь не подходит под новое условие;

  • сообщение будет содержать новый состав заголовков, если было изменено поле headers.

Если на момент первичной публикации события для подписки не был задан атрибут idempotenceHeaderName, то и при повторной отправке соответствующий заголовок передан не будет, даже если к тому моменту атрибут idempotenceHeaderName будет добавлен в подписку.

Клиентский путь#

Порядок действий для подключения функционала subscription:

  1. Определить список событий для публикации.

  2. Добавить в проекте DataSpace в модель данных model.xml соответствующие этим событиям event с необходимым для публикации набором атрибутов.

  3. Создать в проекте DataSpace конфигурацию подписок subscriptions.xml, предусмотрев необходимые параметры для стендозависимой параметризации.

  4. В конфигурации подписок описать шаблон трансформации данных события (и результата выполнения query) в JSON для публикации.

  5. Сформировать файлы customer-properties-subscription.yaml и custom_property.conf.yml, тем самым определив стендозависимую конфигурацию.

  6. Выполнить сборку проекта DataSpace.

  7. Доработать приложение для создания событий в необходимых узлах бизнес-процесса (вместе с изменением бизнес-данных или независимо).

  8. Подготовить интеграционное окружение для публикации: кластер Kafka, topics, манифесты Kubernetes и Istio для сетевых взаимодействий, конфигурации Secret Management System для получения и ротации сертификатов и другое.

  9. Развернуть DataSpace и приложение, протестировать интеграцию.

  10. Провести нагрузочное тестирование, по результатам уточнить параметры масштабирования сервиса.

  11. Выпустить релиз приложения совместно с DataSpace.

Пример: передача в «Каталог продуктов» сведений об изменении клиента по продукту при слиянии/разъединении#

Входное состояние: приложение развернуто с DataSpace, настроено слияние/разъединение Master Data Management.

Порядок действий:

  1. Создать модель с событием слияния. model.xml:

    <model name="rko-product" version="0.1">
        <external-types>
            <external-type type="ClientEPK" merge-kind="Organization"/>
        </external-types>
    
        <class name="RKOProduct">
            <property name="curISO" type="String" length="3"/>
            <property name="contractNumber" type="String" length="64"/>
            <reference name="client" type="ClientEPK" description="По этой ссылке выполняется слияние"/>
        </class>
        <!-- добавляем в модель событие слияния/разъединения client в RKOProduct -->
        <event name="RKOMergeEvent" merge-event="true">
            <property name="rko" type="RKOProduct" parent="true"/>
        </event>
    </model>
    
  2. Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:

    <subscriptions xmlns="DataspaceSubscriptions">
        <subscription id="mergeRKOForPPK"
                      name="MergeRKOForPPK"
                      description="Событие о слиянии клиента продукта для каталога продуктов"
                      target="KAFKA"
                      eventType="RKOMergeEvent"
                      callback="PPK_KAFKA:${rko.merge.subscription.1.topic}"
                      validTill="9999-12-31T23:59:59.999Z"
                      async="false"
                      blocking="true">
            <criteria>
                coalesce(root.mergeFailed, false) != true
            </criteria>
            <query>
                {
                searchRKOProduct(cond: "it.$id=='${rko}'") {
                    elems {
                            id
                            contractNumber
                            curISO
                            client {
                                entityId 
                            }
                        }
                    }
                }
            </query>
            <!-- структура данных на входе преобразования -->
            <!--
            {
              "event": {
                "objectId": "someEventId11111 ",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "RKOMergeEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId01",
                "rko": "1231415534646745",
                "mergeKind": "Organization",
                "referenceName": "client",
                "fromReference": "1999449494944077",
                "toReference": "1999449494944942"
              },
              "data": {
                "searchRKOProduct": {
                    "elems": {
                        "id": "1231415534646745",
                        "contractNumber": "123141553464",
                        "curISO": "RUB",
                        "client": "1999449494944942"
                    }
                }
              }
            }
            -->
            <!-- Ожидаемый результат -->
            <!--
            {
              "Contract": 
              {    
                "ProductCode": "RKO",    
                "ContractID": "1231415534646745",    
                "epkOrgId": "1999449494944942",    
                "ContractNumber": "123141553464",    
                "CurrencyIso": "RUB"
              }
            }
             В примере преобразования в качестве ProductCode подставляется константа "RKO"
            -->
            <template>
            [
                {
                    "operation": "shift",
                    "spec": {
                        "event": {
                            "rko": "Contract.ContractID",
                            "toReference": "Contract.epkOrgId"
                        },
                        "data": {
                            "searchRKOProduct": {
                                "elems": {
                                    "*": {
                                        "contractNumber": "Contract.ContractNumber",
                                        "curISO": "Contract.CurrencyIso"
                                    }
                                }
                            }
                        }
                    }
                },
                {
                    "operation": "default",
                    "spec": {
                        "Contract": {
                            "ProductCode": "RKO"
                        }
                    }
                }
            ]
         </template>
         <headers>
             -XTenantId=${rko.merge.subscription.1.tenantId}
         </headers>
     </subscription>
    </subscriptions>
    
  3. Настроить подключение к Kafka для публикации событий.

  4. Определить стендозависимую конфигурацию. customer-properties-subscription.yaml:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: customer-properties
    data:
      customer.properties: |-
        rko.merge.subscription.1.topic={{ mergeRKOForPPK.topic.value }}
        rko.merge.subscription.1.tenantId={{ mergeRKOForPPK.tenantId.value }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.bootstrap-servers={{ mergeRKOForPPK.kafka.bootstrap.servers }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.acks={{ mergeRKOForPPK.kafka.acks }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.delivery-timeout-ms={{ mergeRKOForPPK.kafka.delivery.timeout.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.request-timeout-ms={{ mergeRKOForPPK.kafka.request.timeout.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.retry-backoff-ms={{ mergeRKOForPPK.kafka.retry.backoff.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.linger-ms={{ mergeRKOForPPK.kafka.linger.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.enable-idempotence={{ mergeRKOForPPK.kafka.enable.idempotence }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.max-in-flight-requests-per-connection={{ mergeRKOForPPK.kafka.max.in.flight.requests.per.connection }}
    

    custom_property.conf.yml:

    mergeRKOForPPK.topic.value=topic-name
    mergeRKOForPPK.tenantId.value=tenantId
    mergeRKOForPPK.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
    mergeRKOForPPK.kafka.acks=all
    mergeRKOForPPK.kafka.delivery.timeout.ms=5000
    mergeRKOForPPK.kafka.request.timeout.ms=200
    mergeRKOForPPK.kafka.retry.backoff.ms=500
    mergeRKOForPPK.kafka.linger.ms=0
    mergeRKOForPPK.kafka.enable.idempotence=true
    mergeRKOForPPK.kafka.max.in.flight.requests.per.connection=5
    # ...
    
  5. Собрать и развернуть приложение.

  6. Выполнить слияние.

Пример: уведомление о состоянии обработки заявки#

Порядок действий:

  1. Создать модель с событием изменения статуса заявки. model.xml:

    <model name="rko-product" version="0.2">   
      <external-types>
        <external-type type="ClientEPK" merge-kind="Individual"/>
      </external-types>
    
      <enum name="ApplicationStatus">
        <value name="DRAFT"/>
        <value name="SUBMITTED"/>
        <value name="APPROVED"/>
        <value name="REJECTED"/>
        <value name="CANCELLED"/>
      </enum>
      <class name="Application">
         <property name="code" type="String" length="32"/>
         <property name="name" type="String" length="254"/>
         <property name="ref" type="String" length="64" unique="true"/>
         <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние/разъединение"/>
         <property name="content" type="Text"/>
         <property name="applicationStatus" type="String" length="32"/>
      </class>   
      <!-- добавляем в модель событие изменения статуса заявки -->
      <event name="StatusChangeEvent">
         <property name="application" type="Application" parent="true" />
         <property name="reason" type="String" length="2048" description="Причина изменения статуса"/>
         <property name="eventUser" type="String" length="254" description="Идентификатор пользователя, действия которого привели к изменению статуса" />
      </event>
    </model>
    
  2. Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:

    <subscriptions xmlns="DataspaceSubscriptions">
        <subscription id="applicationStatusNotify"
                        name="applicationStatusNotify"
                        description="Событие об изменении статуса заявки"
                        target="REST"
                        eventType="StatusChangeEvent"
                        callback="${baseWebHookUrl}/api/v1/statusNotify"
                        validTill="9999-12-31T23:59:59.999Z"
                        maxRetryAttempts="${applications.subscription.1.nretry}"
                        timeoutMs="${applications.subscription.1.timeout}"
                        retryDelayMs="${applications.subscription.1.retrydelay}"
                        async="true" blocking="true">
            <query>
            {
             searchApplication(cond: "it.$id=='${application}'") {
              elems {
                    id
                    code
                    name
                    ref
                    applicationStatus
              }
             }
            }  
            </query>
            <!-- структура данных на входе преобразования -->
            <!--
            {
              "event": {
                "objectId": "someEventId555",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "StatusChangeEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId02",
                "application": "1231415534646745",
                "reason": "Application successfully passed all checks and has been approved",
                "eventUser": "xxx-uuuuuuu-zzz"
              },
              "data": {
                "searchApplication": {
                    "elems": {
                        "id": "1231415534646745",
                        "code": "APP-X333",
                        "name": "Application for grant",
                        "ref": "APP-X333-113333332",
                        "applicationStatus": "APPROVED"
                    }
                }
              }
            }
            -->
            <!-- Ожидаемый результат -->
            <!--
            {
                "ApplicationReference": "APP-X333-113333332",    
                "ApplicationCode": "APP-X333",    
                "ApplicationName": "Application for grant",    
                "Status": "APPROVED",    
                "Reason": "Application successfully passed all checks and has been approved",
                "Timestamp": "2023-04-01T22:22:22.001Z"
            }
            -->
            <template>
            [
              {
                "operation": "shift",
                "spec": {
                    "event": {
                        "reason": "Reason"
                        "creationTimeStamp": "Timestamp"
                    },
                    "data": {
                        "searchApplication": {
                            "elems": {
                                "*": {
                                    "code": "ApplicationCode",
                                    "name": "ApplicationName",
                                    "ref": "ApplicationReference",
                                    "applicationStatus": "Status"
                                }
                            }
                        }
                    }
                 }
              }
            ]
            </template>
            <headers>
              -XTenantId=${applications.subscription.1.tenantId}
              -XchangeUser=${eventUser}
            </headers>
        </subscription>
    </subscriptions>
    
  3. Сконфигурировать подключение к REST API для публикации.

  4. Определить стендозависимую конфигурацию. customer-properties-subscription.yaml:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: customer-properties
    data:
      customer.properties: |-
        baseWebHookUrl={{ applicationStatusNotify.baseWebHookUrl.value }}
        applications.subscription.1.nretry=1
        applications.subscription.1.timeout=10000
        applications.subscription.1.retrydelay={{ applicationStatusNotify.retrydelay.value }}
        applications.subscription.1.tenantId={{ applicationStatusNotify.tenantId.value }}
    

    custom_property.conf.yml:

    applicationStatusNotify.baseWebHookUrl.value=http://localhost:8082
    applicationStatusNotify.retrydelay.value=2000
    applicationStatusNotify.tenantId.value=tenantId
    # ...
    
  5. Собрать DataSpace.

  6. Задать стендозависимые параметры.

  7. Развернуть приложение.

  8. Вызвать /packet API DataSpace с командой create для event и командой установки applicationStatus в «APPROVED».

    {
      "jsonrpc": "2.0",
      "method": "execute",
      "id": 1,
      "params": {
        "packet": {
          "commands": [
            {
              "id": "updateApplication",
              "name": "update",
              "params": {
                "type": "Application",
                "id": "<идентификатор_заявки>",
                "applicationStatus": "APPROVED"
              }
            },
            {
              "id": "createStatusChangeEvent",
              "name": "create",
              "params": {
                "type": "StatusChangeEvent",
                "application": "<идентификатор_заявки>",
                "reason": "заявка обработана",
                "eventUser": "<идентификатор_пользователя>"
              }
            }
          ]
        }
      }
    }
    

Пример: публикация в topic Kafka события изменения суммы на счету AccountBalanceChangeEvent#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

<!-- добавляем дочерний AccountBalanceChangeEvent -->
<event name="AccountBalanceChangeEvent" extends="BaseChangeEvent">
  <property name="account" type="Account" parent="true"/>
  <parents-property name="balance" />
</event>

Создать подписку:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
    <subscription id="AccountBalanceChange" name="AccountBalanceChange"
                  description="Публикация события об изменении суммы счета" validTill="2025-01-01T00:00:00.000Z"
                  target="KAFKA" callback="PPK_KAFKA:${project.account.subscription.1.topic}"
                  eventType="AccountBalanceChangeEvent"
                  async="false" blocking="true">
        <query>
            {
                searchAccount (cond: "it.id == ${account}") {
                    elems {
                        number
                        balance {
                            value
                            currency
                        }
                    }
                }
            }
        </query>
        <!-- структура данных на входе преобразования
        {
            "event": {
                "objectId": "someEventId11111",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "AccountAmountChangeEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId01",
                "account": "12314155346467451",
                "sysVersion": 3,
                "sysTimeChanged": "2023-04-01T22:22:23.551Z"
            },
            "data": {
                "searchAccount":{
                    "elems": [
                        {
                            "number": "40817810500000000223"
                            "balance": {
                               "value": 100,
                               "currency": "810"
                            }
                        }
                    ]
				}
            }
        }
        -->
        <!-- Ожидаемый результат
        {
            "changeNo" : 3,
            "timeChanged" : "2023-04-01T22:22:23.551Z",
            "amount" : 100,
            "currency" : "810",
            "account" : "40817810500000000223"
        }
        -->
        <template>
            [
                {
                    "operation": "shift",
                    "spec": {
                        "event": {
                            "sysVersion": "changeNo",
                            "sysTimeChanged": "timeChanged"
                        },
                        "data": {
                            "searchAccount": {
                                "elems": {
                                    "*" : {
                                        "number": "account",
                                        "balance": {
                                            "value": "amount",
                                            "currency": "currency"
                                        }  
                                    }
                                }
                            }
                        }
                    }
                }
            ]
        </template>
    </subscription>
</subscriptions> 

Пример: публикация в topic Kafka события изменения состояния счета AccountStatusChangeEvent#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

<!-- добавляем дочерний AccountStatusChangeEvent -->
<event name="AccountStatusChangeEvent" extends="BaseChangeEvent">
    <property name="account" type="Account" parent="true"/>
    <parents-property name="statusForAccounting" />
</event>

Создать подписку:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
    <subscription id="AccountStatusChange" name="AccountStatusChange"
                  description="Публикация события об изменении статуса счета" validTill="2025-01-01T00:00:00.000Z"
                  target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.2.topic}"
                  eventType="AccountStatusChangeEvent"
                  async="false" blocking="true">
        <query>
        {
            searchAccount (cond: "it.id == ${account}") {
                elems {
                    number
                    statusForAccounting {
                        code
                    }
                }
            }
        }
        </query>
        <!-- структура данных на входе преобразования
        {
          "event": {
            "objectId": "someEventId11111",
            "creationTimestamp": "2023-04-01T22:22:22.001Z",
            "type": "AccountStatusChangeEvent",
            "status": "processed",
            "lastChangeDate": "2023-04-01T22:22:23.551Z",
            "ownerId": "someOwnerId01",
            "account": "1231415534646745",
            "sysVersion": 3,
            "sysTimeChanged": "2023-04-01T22:22:23.551Z"
          },
          "data": {
              "searchAccount":{
                  "elems": [
                     {
                       "number": "40817810500000000223",
                        "statusForAccounting": {
                            "code"
                        }
                     }
				  ]
              }
          }
        }
        -->
        <!-- Ожидаемый результат
        {
            "changeNo": 3,
            "timeChanged": "2023-04-01T22:22:23.551Z",
            "status": "frozen",
            "account": "40817810500000000223"
        }
        -->
        <template>
        [
            {
                "operation": "shift",
                "spec": {
                    "event": {
                        "sysVersion": "changeNo",
                        "sysTimeChanged": "timeChanged"
                    },
                    "data": {
                        "searchAccount": {
                            "elems": {
                                "*" : {
                                    "number": "account",
                                    "statusForAccounting": {
                                        "code":"status"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        ]
        </template>
    </subscription>
</subscriptions>

Пример: репликация состояния счета в OpenSearch за счет публикации события AccountObjectEvent#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

<!-- добавляем дочерний AccountObjectEvent -->
<event name="AccountObjectEvent" extends="BaseObjectEvent">
    <property name="account" type="Account" parent="true"/>
</event>

Создать подписку на создание и изменение счета и подписку на удаление счета:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
    <subscription id="AccountObjectEvent" name="AccountObjectEvent"
                  description="Репликация состояния счета в OpenSearch" validTill="2025-01-01T00:00:00.000Z"
                  target="REST" callback="PUT ${project.account.subscription.3.url}/_doc/${account}" 
				  maxRetryAttempts="10" 
				  timeoutMs="5000" 
				  retryDelayMs="10000"
                  eventType="AccountObjectEvent"
                  async="false" blocking="true">
        <criteria>root.sysObjectEvent != 'D'</criteria>
        <query>
        {
            searchAccount (cond: "it.id == ${account}") {
                elems {
                    number  
                    description
                    accountType
                    balance {
                        value
                        currency
                    }
                    client {
                        entityId
                    }
                    statusForAccounting {
                        code
                    }
                    branch {
                        code
                        location
                    }
                    statementInfo {
                        periodicity
                        title
                    }
                    tags {
                        elems
                    }
                    postings {
                        elems {
                            amount
                        }
                    }
                }
            }
        }
        </query>
        <!-- структура данных на входе преобразования         
        {
          "event": {
            "objectId": "someEventId11111",
            "creationTimestamp": "2023-04-01T22:22:22.001Z",
            "type": "AccountObjectEvent",
            "status": "processed",
            "lastChangeDate": "2023-04-01T22:22:23.551Z",
            "ownerId": "someOwnerId01",
            "account": "1231415534646745",
            "sysVersion": 3,
            "sysTimeChanged": "2023-04-01T22:22:23.551Z",
            "sysObjectEvent": "C"
          },
          "data": {
              "searchAccount":{
                  "elems": [
                     {
                      "number": "40817810500000000223",
                      "description": "Personal Account",
                      "accountType": "INDCUR",
                      "balance": {
                        "value" : 100.0,
                        "currency": "978"
                      },
                      "client": {
                        "entityId": "0123445"
                      },
                      "statusForAccounting": {
                        "code": "active"
                      },
                      "branch": {
                        "code": "ABBC",
                        "location": "Country, City, Street, Block"
                      },
                      "statementInfo": {
                        "periodicity": 30,
                        "title": "Account statement"
                      },
                      "tags": {"elems" : ["personal", "salary"]},
                      "postings": {"elems": [ {"amount":100.0},{"amount":-50.0}] }
                    }
                  ]
              }    
          }
        }
         -->
        <!-- Ожидаемый результат
        {
          "number" : "40817810500000000223",
          "description" : "Personal Account",
          "accountType" : "INDCUR",
          "balance" : {
            "amount" : 100,
            "currency" : "978"
          },
          "client" : "0123445",
          "statusForAccounting" : {
            "code" : "active"
          },
          "branch" : {
            "code" : "ABBC",
            "location" : "Country, City, Street, Block"
          },
          "statement" : {
            "periodicity" : 30,
            "title" : "Account statement"
          },
          "tags" : ["personal", "salary" ],
          "postings" : [ { "amount" : 100.0}, {"amount" : -50.0} ],
          "sysVersion" : 3,
          "sysTimeChanged" : "2023-04-01T22:22:23.551Z"
        }
        -->
        <template>
        [
          {
            "operation": "shift",
            "spec": {
                "data": {
                    "searchAccount": {
                        "elems": {
                            "*" : {
                              "number": "number",
                              "description": "description",
                              "accountType": "accountType",
                              "balance": { 
                                "value": "balance.amount",
                                "currency": "balance.currency"
                              },
                              "client": "client",
                              "statusForAccounting": "statusForAccounting",
                              "branch": "branch",
                              "statementInfo": "statement",
                              "tags": {"elems": "tags"},
                              "postings": {"elems":"postings"}
                            }
                        }
                    }
              },
              "event": {
                "sysVersion": "sysVersion",
                "sysTimeChanged": "sysTimeChanged"
              }
            }
          }
        ]
        </template>
    </subscription>
    <!-- Подписка на удаление -->
    <subscription id="AccountObjectDeleteEvent" name="AccountObjectDeleteEvent"
                  description="Репликация удаления счета в OpenSearch" validTill="2025-01-01T00:00:00.000Z"
                  target="REST" callback="DELETE ${project.account.subscription.3.url}/_doc/${account}" 
				  maxRetryAttempts="10" 
				  timeoutMs="5000" 
				  retryDelayMs="10000"
                  eventType="AccountObjectEvent"
                  async="false" blocking="true">
        <criteria>root.sysObjectEvent == 'D'</criteria>
        <!-- структура данных на входе преобразования         
        {
          "event": {
            "objectId": "someEventId11111",
            "creationTimestamp": "2023-04-01T22:22:22.001Z",
            "type": "AccountObjectEvent",
            "status": "processed",
            "lastChangeDate": "2023-04-01T22:22:23.551Z",
            "ownerId": "someOwnerId01",
            "account": "1231415534646745",
            "sysVersion": 3,
            "sysTimeChanged": "2023-04-01T22:22:23.551Z",
            "sysObjectEvent": "D"
          }
        }
        -->
    </subscription> 
</subscriptions>

Пример: публикация события AccountAmountAndStatusTracking в topic Kafka#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

<!-- добавляем дочерний AccountTrackBalanceAndStatusEvent -->
<event name="AccountTrackBalanceAndStatusEvent" extends="BaseTrackingEvent">
  <property name="account" type="Account" parent="true"/>
  <parents-property name="balance" />
  <parents-property name="statusForAccounting.code" rename="statusCode"/>
</event>

Создать подписку:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
      <subscription id="AccountTrackBalanceAndStatusEvent" name="AccountTrackBalanceAndStatusEvent"
                  description="Публикация события об изменении суммы счета" validTill="2025-01-01T00:00:00.000Z"
                  target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.4.topic}"
                  eventType="AccountTrackBalanceAndStatusEvent"
                  async="false" blocking="true">
        <query>
        {
            searchAccount (cond: "it.id == ${account}") {
                elems {
                    number
                }
            }
        }
        </query>
        <!-- структура данных на входе преобразования
        {
            "event": {
                "objectId": "someEventId11111",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "AccountTrackAmountAndStatusEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId01",
                "account": "12314155346467451",
                "sysVersion": 3,
                "sysTimeChanged": "2023-04-01T22:22:23.551Z",
     			"sysObjectEvent": "U",
				"sysChangeUser": "P01234412",
                "balance": {
                    "value": 100,
                    "currency": "810"
                },
				"statusCode": "open"
            },
         "data": {
              "searchAccount":{
                  "elems": [
                     {
                        "number": "40817810500000000223"
                     }
                  ]
               }   
            }
        }
        -->
        <!-- Ожидаемый результат
        {
            "changeNo" : 3,
            "timeChanged" : "2023-04-01T22:22:23.551Z",
            "amount" : 100,
            "currency" : "810",
			"status": "open",
            "account" : "40817810500000000223",
			"user": "P01234412"
        }
        -->
        <template>
        [
            {
                "operation": "shift",
                "spec": {
                    "event": {
                        "sysVersion": "changeNo",
                        "sysTimeChanged": "timeChanged",
                        "balance": {
                            "value": "amount",
                            "currency": "currency"
                        },
						"statusCode": "status",
						"sysChangeUser": "user"
                    },
                    "data": {
                        "searchAccount": {
                            "elems": {
                                "*" : {
                                    "number": "account"
                                }
                            }
                        }
                    }
                }
            }
        ]            
        </template>
    </subscription>
</subscriptions> 

Пример: аудит изменений счета — публикация события AccountSnapshotEvent в Аудит#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

<!-- добавляем дочерний AccountSnapshotEvent -->
<event name="AccountSnapshotEvent" extends="BaseSnapshotEvent" snapshot-large-properties="true">
    <property name="account" type="Account" parent="true"/>
    <parents-property name="statusForAccounting.code" rename="statusCode"/>
    <parents-property name="branch.code" rename="branchCode"/>
    <parents-property name="status" rename="accountStatus"/>
    <parents-property name="statementInfo.periodicity" rename="statementPeriodicity"/>
</event>

Создать подписку:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
    <subscription id="AccountSnapshotEvent" name="AccountSnapshotEvent"
                  description="Аудит всех изменений счета" validTill="2025-01-01T00:00:00.000Z"
                  target="REST" callback="${project.account.subscription.5.url}/audit/postEvent" 
				  maxRetryAttempts="10" 
				  timeoutMs="5000" 
				  retryDelayMs="10000"
                  eventType="AccountSnapshotEvent"
                  async="false" blocking="true">
        <criteria>root.sysObjectEvent != 'D'</criteria>
        <!-- структура данных на входе преобразования         
        {
          "event": {
            "objectId": "someEventId11111",
            "creationTimestamp": "2023-04-01T22:22:22.001Z",
            "type": "AccountObjectEvent",
            "status": "processed",
            "lastChangeDate": "2023-04-01T22:22:23.551Z",
            "ownerId": "someOwnerId01",
            "account": "1231415534646745",
            "sysVersion": 3,
            "sysTimeChanged": "2023-04-01T22:22:23.551Z",
            "sysObjectEvent": "C",
 			"sysChangeUser": "P01234412",
            "accountGroup": "553453453555",
            "accountType": "INDCUR",
            "number": "40817810500000000223",
            "balance": {
               "value" : 100.0,
               "currency": "978"
            },
            "statementInfo": "61231313132312312",
            "branch": "1231313132312312",
            "client": "0123445",
            "description": "Personal Account",
            "hash": "AFFFCD02E1",
            "accountStatus":"active",
            "product": "345355555345553",
            "branchCode": "1234",
            "statusForAccounting":"4",
            "statusCode": "open",
            "statementPeriodicity": 7
          }
        }
        -->
        <!-- Ожидаемый результат
        {
          "userLogin" : "P01234412",
          "params" : {
            "_changeType" : "C",
            "accountId" : "1231415534646745",
            "number" : "40817810500000000223",
            "description" : "Personal Account",
            "accountType" : "INDCUR",
            "amount" : 100,
            "currency" : "978",
            "client" : "0123445",
            "statusCode" : "open",
            "branch" : "1234",
            "accountStatus" : "active",
            "statementPeriodicity" : 7,
            "checksum": "AFFFCD02E1"
          },
          "name" : "AccountChange",
          "userNode" : null,
          "metamodelVersion" : "0.1",
          "module" : "DSPC",
          "userName" : null,
          "session" : null
        }
        -->
        <template>
        [
          {
            "operation": "shift",
            "spec":{
              "event": {
                "createdAt": "sysTimeChanged",
                "sysChangeUser": "userLogin",
                "sysVersion": "params.sysVersion",
                "sysObjectEvent": "params._changeType",
                "account": "params.accountId",
                "number": "params.number",
                "description": "params.description",
                "accountType": "params.accountType",
                "balance": {
                  "value": "params.amount",
                  "currency": "params.currency"
                },
                "client": "params.client",
                "statusCode": "params.statusCode",
                "branchCode": "params.branch",
                "accountStatus": "params.accountStatus",
                "statementPeriodicity": "params.statementPeriodicity",
                "hash": "params.checksum"
              }
            }
          },
          {
            "operation": "default",
            "spec": {
              "metamodelVersion": "0.1",
              "name": "AccountChange",
              "session": null,
              "module": "DSPC",
              "userName": null,
              "userNode": null
            }
          }
        ]
        </template>
    </subscription>
</subscriptions>

Пример: публикация события об изменении истории счета в topic Kafka#

Модель данных для примера приведена в разделе «Модель данных для иллюстрации примеров работы с событиями отслеживания изменений».

В модель предметной области добавить событие:

    <event name="AccountHistoryEvent" extends="BaseHistoryEvent"> 
        <property name="account" type="Account" parent="true"/> <!-- Примечание: Тип `Account` историцируемый, т.к. есть историцируемые поля -->
    </event>

Создать подписку:

<?xml version="1.0" encoding="UTF-8" ?>
<subscriptions xmlns="DataspaceSubscriptions">
  <subscription id="AccountHistory" name="AccountHistory" description="Публикация события об изменении истории счета" validTill="2025-01-01T00:00:00.000Z"
    target="KAFKA" callback="LOCAL_KAFKA1:${project.account.subscription.1.topic}"
    eventType="AccountHistoryEvent"                
    async="false" blocking="true">
    <query>
    { 
        merge { 
            elems {
                ... on Account @mergeReqSpec(cond: "it.$id=='${account}'") {
                    id
                   number
                   accountType
                 }
                ... on AccountHistory @mergeReqSpec(cond: "it.sysHistoryOwner.id=='${account}' &amp;&amp; it.sysHistNumber==${sysVersion}") {
                    sysHistNumber
                    balance {
                        value
                        sysValueUpdated
                        currency
                        sysCurrencyUpdated
                    }
                }
            }
        } 
    }
    </query>
        <!-- структура данных на входе преобразования
        {
          "event": {
            "objectId": "someEventId11111 ",
            "creationTimestamp": "2023-04-01T22:22:22.001Z",
            "type": "AccountHistoryEvent",
            "status": "processed",
            "lastChangeDate": "2023-04-01T22:22:23.551Z",
            "ownerId": "someOwnerId01",
            "account": "1231415534646745",
            "sysVersion": 3,
            "timeChanged": "2023-04-01T22:22:23.551Z"
          },
         "data": {
              "merge":{
                  "elems": [
                     {
                        "id": "1231415534646745",
                        "number": "40817810500000000223",
                        "accountType": "CACC"
                     },
                     {
                         "sysHistNumber": 4,
                         "balance": {
                            "value": 1000.00,
                            "sysValueUpdated": true,
                            "currency": null,
                            "sysCurrencyUpdated": false
                        }
                     } 
                  ]               
              }
            }
        }
        -->
        <!-- Ожидаемый результат
        {
            "changeNo": 3,
            "timeChanged": "2023-04-01T22:22:23.551Z",
            "account": "40817810500000000223",   
            "amount": 1000.0,   
            "currency": null   
        }
        -->    
        <template>        
        [
          {
            "operation": "shift",
            "spec": {
              "event": {
                "sysVersion": "changeNo",
                "timeChanged": "timeChanged"
              },
            "data": {
                "merge": {
                    "elems": {
                        "*" : {
                             "number": "account",
                             "balance": {
                                "value": "amount",
                                "currency": "currency"
                             }
                        }
                    }
                }
             } 
          }
       }
     ]
     </template>
  </subscription>
</subscriptions>