Руководство по работе с сервисом DataSpace Subscription#

О документе#

В составе продукта Platform V DataSpace (APT) поставляется сервис DataSpace Subscription, реализующий универсальный (повторно используемый разными потребителями) механизм, позволяющий DataSpace участвовать в событийном обмене (выступать в роли провайдера событий).

В данном документе приведено описание настройки сервиса DataSpace Subscription и работы с ним.

Расшифровку основных понятий см. в документе "Термины и определения".

Описание механизма Subscription#

События#

Предусмотрена возможность формирования событий (Event) по видам изменений:

  • Создание событий в пакете команд и mutation GraphQL.

  • Проведение операции слияния/разъединения для заданного класса.

Состав полей события и его привязку к другим объектам определяет потребитель DataSpace в модели своей предметной области при помощи объектов вида Event.

Подписки и публикация событий#

Предоставлена возможность создания подписки на события. В подписке задаются:

  • тип события;

  • способ публикации — через topic Kafka или через webhook (вызов REST API);

  • спецификация преобразования данных события в тело сообщения;

  • запрос (query), который должен быть выполнен при соответствующем событии для получения дополнительных данных для публикации;

  • дополнительный фильтр по данным события, если фильтр не вернул данные, то публикация не осуществляется;

  • таймаут и количество retry при публикации через webhook;

  • возможность определить дополнительные заголовки (headers), содержимое которых может быть задано явно или параметризовано данными события (Event).

Тело сообщения для публикации представляет собой JSON, формируемый по заданному в подписке преобразованию в JSON данных Event и полей из результирующего query, по умолчанию для преобразования JSON в JSON используется библиотека JOLT.

События слияния/разъединения#

Если сконфигурировать подписку на события слияния/разъединения, то при выполнении слияния/разъединения сформируется событие слияния/разъединения, которое будет опубликовано в соответствии с подпиской.

Для этого в модели предметной области необходимо объявить Event с признаком merge-event="true", связанный с классом, содержащим референс, обрабатываемый при слиянии/разъединении. Такие Event не доступны для создания через пакет команд или mutation GraphQL. В качестве значения тенанта (атрибут ownerId) для таких событий устанавливается значение, равное значению тенанта того агрегата, в рамках которого произошли изменения.

Если процесс слияния/разъединения завершился успешно, то формируется Event со значением поля mergeFailed = false.

Предусмотрена возможность создания события в том случае, если процесс слияния/разъединения завершился с ошибкой, которая требует постороннего вмешательства. За это отвечает настройка сервиса дедубликации duplication.service.enable-failed-merge-events. По умолчанию формирование таких событий выключено: duplication.service.enable-failed-merge-events=false.

Если перевести настройку duplication.service.enable-failed-merge-events сервиса дедубликации в значение true, то при возникновении ошибки процесса слияния/разъединения будут создаваться события со значением поля mergeFailed = true. При этом также заполняется поле errorCode — код ошибки и errorMessage — сообщение об ошибке.

Виды ошибок, при которых формируется Event со значением поля mergeFailed = true предопределены.

Доступен вид ошибок UNIQ — ошибка нарушения уникальности.

Для подписки на неуспешные события слияния/разъединения необходимо использовать фильтр root.mergeFailed == true в поле criteria для подписки.

Примечание

Если функциональность обработки событий слияния/разъединения клиентов первый раз используется потребителем, начиная с версии компонентов DataSpace 1.11.0 и выше, и если настройка сервиса дедубликации duplication.service.enable-failed-merge-events установлена в значение true, то для подписки на успешные события слияния/разъединения достаточно использовать фильтр root.mergeFailed == false в поле criteria для подписки.

Eсли данная функциональность использовалась до версии 1.11.0, и если настройка сервиса дедубликации duplication.service.enable-failed-merge-events установлена в значение true, то следует использовать фильтр coalesce(root.mergeFailed, false) != true в поле criteria для подписки.

Диаграммы событийного обмена#

@startuml
title Публикация событий
participant "Приложение" as app
participant "DataSpace-Core" as core
database "DataSpace DB" as db
participant "DataSpace-Subscription" as subs
queue "topic Kafka" as topic
participant "Подписчик 1" as subscriber1
participant "Подписчик N" as subscribern
app -> core : пакет с командами\ncreateEvent
core -> db : сохранить\nсобытие
db -> subs : прочитать\nновые события
subs -> db : выполнить запрос\nзаданный в подписке 1
subs -> topic : опубликовать\nсобытие через\ntopic Kafka 
topic -> subscriber1 : прочитать\nсобытие  
subs -> db : выполнить запрос\nзаданный в подписке N
subs -> subscribern : опубликовать\nсобытие через\nREST API
subs -> db : по прошествии\nзаданного периода\nудалить все данные\nсобытия и публикации 
@enduml

@startuml
title Публикация событий при слиянии/разъединении
participant "Master Data Management" as app
participant "DataSpace-StateMachine" as stm
participant "DataSpace-ReferenceUpdater" as reup
database "DataSpace DB" as db
participant "DataSpace-Subscription" as subs
queue "topic Kafka" as topic
participant "Подписчик" as subscriber1
app -> stm : событие\nо слиянии клиентов
stm -> db : создать задачу\nна слияние
db -> reup : получить и выполнить\nзадачу на слияние
reup -> db : найти все\nобъекты для слияния
reup -> db : для каждого объекта\n — заменить значение\nссылки на новое,\nсформировать событие слияния
db -> subs : прочитать\nновые события
subs -> db : выполнить запрос\nзаданный в подписке
subs -> topic : опубликовать\nсобытие через\ntopic Kafka 
topic -> subscriber1 : прочитать\nсобытие  
subs -> db : по прошествии\nзаданного периода\nудалить все данные\nсобытия и публикации 
@enduml

Требования к модели#

В модели данных необходимо описать события (event), которые будут обрабатываться при работе сервиса DataSpace Subscription.

Событие может выступать как в роли самостоятельного агрегата, так и являться частью одного из существующих агрегатов в модели, для этого у события должна быть объявлена ссылка на родительский класс (parent="true"). Событие, у которого отсутствует родительская ссылка, является самостоятельным агрегатом.

Отличия Event от класса:

  • для него определена только операция создания;

  • служит только для подписки на него;

  • не наследуется и имеет плоскую структуру;

  • допустима ссылка на родительский класс для формирования агрегатной связи. Event, у которого отсутствует родительская ссылка, является самостоятельным агрегатом;

  • обработанный всеми подписками Event автоматически удаляется по истечении времени;

  • не реплицируется в хранилище данных и не отражается в логической модели для хранилища данных.

Пример описания события, являющегося самостоятельным агрегатом:

<event name="MyEvent">
    <property name="message" type="String"/>
</event>

Пример события, которое входит в агрегат Product:

<model>
  <class name="Product" label="Продукт" lockable="true">
    <property name="name" type="String" label="Наименование"/>
  </class>

  <event name="ProductEvent">
    <property name="message" type="String"/>
    <property name="product" type="Product" parent="true"/>
  </event>
</model>

Отдельным видом событий являются события слияния/разъединения. В модели они обозначаются признаком merge-event="true". Для событий слияния/разъединения обязательно наличие ссылки на родительский класс:

<model>
    <external-types>
        <external-type type="Client" merge-kind="Organization"/>
    </external-types>
    
    <class name="Product" label="Продукт" lockable="true">
        <property name="name" type="String" label="Наименование"/>
        <property name="code" type="String" label="Код"/>
        <property name="oneToOneLink" type="One2OneClassLink" mappedBy="product" label="o2o связь"/>
        <property name="services" type="Service" collection="set" mappedBy="product" label="Коллекция сервисов"/>
    </class>

    <class name="Service" label="Сервис" lockable="true">
        <property name="name" type="String" label="Наименование"/>
        <property name="code" type="String" label="Код"/>
        <property name="product" type="Product" label="продукт" parent="true" index="true"/>
    </class>

    <class name="One2OneClassLink" label="Класс для связи o2o">
        <property name="name" type="String" label="Наименование"/>
        <property name="product" type="ProductParty" parent="true"/>
    </class>

    <event name="ProductClientMergeEvent" merge-event="true">
        <reference name="client" type="Client" label="Идентификатор клиента"/>
        <property name="product" type="Product" parent="true"/>
    </event>
</model>

Для свойств событий допустимо указание тегов: name, type, parent, label, length, precision, mandatory, default-value.

События слияния/разъединения создаются автоматически сервисом DataSpace ReferenceUpdater.

Спецификация подписок#

Спецификация подписок представляет собой xml-файл subscriptions.xml, размещенный в папке model рядом с model.xml. При сборке проекта файл попадает в конфигурацию DataSpace Subscription. При старте сервис DataSpace Subscription читает этот файл и загружает подписки в таблицу T_DSPC_SYS_SUBSCRIPTION.

Пример:

<subscriptions xmlns="DataspaceSubscriptions">
  <subscription id="mergeSubscription1" 
                name="EPKMergeForProduct" 
                description="Публикация события с слиянии во внешнюю систему" 
                target="REST" 
                eventType="ProductClientMergeEvent" 
                callback="${my.merge.subscription.1.url}/api/v1/products" 
                validTill="20240101T00:00:00Z" 
                maxRetryAttempts="${my.merge.subscription.1.nretry}" 
                timeoutMs="${my.merge.subscription.1.timeout}" 
                retryDelayMs="${my.merge.subscription.1.retrydelay}" 
                async="false" 
                blocking="true"
                idempotenceHeaderName="requestUID">
    <criteria>
    root.referenceName=='client' &amp;&amp; coalesce(root.mergeFailed, false) != true
    </criteria>
    <query>
        {
        searchProduct(cond: "it.$id=='${product}'") {
            elems {
                code
                name
                services {
                    elems {
                        code
                        name
                    }
                }
                oneToOneLink {
                    name
                }
            }
          }
        }
    </query>
    <template>
        [
        {
            "operation": "shift",
            "spec": {
                "data": {
                    "searchProduct": {
                        "elems": {
                            "*": {
                                "code": "ProductCodeMappedTo",
                                "oneToOneLink": {
                                    "name": "oneToOneLinkNameMappedTo"
                                },
                                "services": {
                                    "elems": {
                                        "*": {
                                            "code": "servicesMappedTo"
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        ]
    </template>
    <headers>
      -XTenantId=${my.merge.subscription.1.tenantId}
    </headers>
  </subscription>
</subscriptions>

Описание полей:

  • id — уникальный идентификатор подписки, используется в качестве primary key в таблице подписок T_DSPC_SYS_SUBSCRIPTION.

    Внимание!

    Значение "0" зарезервировано системой.

  • name — наименование подписки.

  • description — текстовое описание подписки.

  • target — способ публикации (REST,KAFKA,GRAPHQLSUBS).

  • eventType — тип событий (entity name), на которые осуществляется подписка.

  • callback — URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka.

  • validTill — время окончания действия подписки.

  • maxRetryAttempts — количество повторных попыток публикации (актуально только для target == REST).

  • timeoutMs — тайм-аут публикации, в миллисекундах (актуально только для target == REST).

  • retryDelayMs — задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST).

  • async — использование асинхронной отправки (true/false).

  • blocking — приостановление отправки при первой ошибке (true/false).

  • idempotenceHeaderName — наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности. При всех попытках отправки одного и того же сообщения заголовок принимает одинаковое значение. Если атрибут не задан или значение пустое — заголовок не формируется.

  • criteria — строковое выражение для фильтрации записей события. С описанием формата строковых выражений можно ознакомиться в документе "Строковые выражения".

  • query — текст GraphQL-запроса для чтения дополнительных данных. С GraphQL-API можно ознакомиться в документе "Протокол GraphQL". Текст запроса может содержать placeholder формата ${<наименование_свойства_события>}. Перед выполнением GraphQL-запроса, placeholder будут заменены на соответствующие значения свойств события. <наименование_свойства_события> — наименование свойства JPA-класса события entity, name которого прописан в поле eventType у конкретного subscription в subscriptions.xml.

  • template — спецификация трансформации JSON-данных в JSON-сообщение. Для трансформации по умолчанию используется библиотека JOLT, поэтому для описания преобразования в параметре template используется формат спецификации этой библиотеки. Исходные данные для трансформации по шаблону имеют общий вид:

    {
      "event": {
        <атрибуты события>
      },
      "data": {
        <результат выполнения graphql-запроса. Если graphql-запрос в подписке не задан или не вернул данных  элемент пустой>
      }
    }
    
  • headers — спецификация дополнительных заголовков сообщения. Формат: <наименование_заголовка>=<значение>. <значение> может содержать Placeholder формата ${<наименование_свойства_события>}, которые будут заменены аналогично логике, описанной для поля query.

Примечание

На старте сервиса DataSpace Subscription проводится валидация полей criteria,query, template. Если валидация не проходит, сервис завершает свою работу.

Внимание!

Поля criteria,query, template и headers не могут содержать пустое значение. Если не требуется задавать значение для данных полей, то не нужно указывать соответствующие теги.

Стендозависимая конфигурация subscriptions.xml#

Потребитель может использовать в значениях полей target, callback, template, headers, maxRetryAttempts, timeoutMs, retryDelayMs Placeholder, которые заполняться при загрузке модуля из конфигурации приложения. Конфигурация приложения может быть параметризована и сделана стендозависимой.

Для этого необходимо добавить в файл customer-properties.subscription.yaml список параметров со значениями. В качестве значения может быть использован также параметр развертывания, например, {{tenantId}}. Эти параметры должны быть перечислены также в файле custom_property.conf.yml, могут быть как со значениями по умолчанию, так и без значений. При установке на стенд этим параметрам должны быть заданы значения в стендозависимой конфигурации.

Подробнее см. в документе "Руководство по установке" в следующих разделах:

  • "Добавление пользовательских параметров в дистрибутив".

  • "Использование пользовательских параметров в рамках сервиса DataSpace".

Пример customer-properties.subscription.yaml:

kind: ConfigMap
apiVersion: v1
metadata:
  name: customer-properties
data:
  customer.properties: |-
    my.merge.subscription.1.url={{ customer.value1 }}
    my.merge.subscription.1.nretry=1
    my.merge.subscription.1.timeout={{ customer.value2 }}
    my.merge.subscription.1.retrydelay=2000
    my.merge.subscription.1.tenantId={{ customer.value3 }}

Пример custom_property.conf.yml:

customer.value1=http://localhost:8082
customer.value2=10000
customer.value3=tenantId
# ...

Гарантия последовательности публикации событий

Для гарантии последовательной отправки событий по агрегату необходимо для подписки:

  • выключить async-режим: async="false";

  • включить blocking-режим: blocking="true".

Комбинацию async="true" blocking="true" использовать не следует, т.к. последовательность обработки в этом случае не гарантируется.

Процессы и масштабирование#

Основную логику сервиса DataSpace Subscription реализуют процессы EVENT_TRANSFER и EVENT_WORKER.

Задача процесса EVENT_TRANSFER состоит в том, чтобы для каждого события, по которому есть валидные подписки, сформировать записи в таблице очереди на публикацию T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего событие считается обработанным. При этом для всех выбранных новых Event (для которых нашлись валидные подписки и были созданы записи в T_DSPC_SYS_EVENT_QUEUE_ITEM, а также для которых подписок не нашлось) статус меняется на PROCESSED.

Процесс EVENT_WORKER обрабатывает таблицу очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM: читает соответствующее событие с применением условия criteria подписки, выполняет запрос дополнительных данных, используя query, трансформирует данные в итоговое сообщение по шаблону template и осуществляет публикацию сообщения на основании target и callback.

Предусмотрена возможность масштабирования процессов EVENT_TRANSFER и EVENT_WORKER. За количество процессов, которые будут запущены, отвечают настройки dataspace.subscription.event-transfer.transfer-processes-count и dataspace.subscription.event-worker.worker-processes-count соответственно. Реализована возможность распределения процессов по партициям для того, чтобы процессы обрабатывали каждый свой набор данных. За количество партиций для процессов EVENT_TRANSFERи EVENT_WORKER отвечают настройки dataspace.subscription.event-transfer.partitions-number и dataspace.subscription.event-worker.partitions-number соответственно. Партиции ведутся в разрезе подписок. Для процессов EVENT_TRANSFER зарезервирован идентификатор подписки со значением "0".

Внимание!

Настройки количества партиций для процессов EVENT_TRANSFER и EVENT_WORKER должны иметь одинаковые значение для всех экземпляров сервиса DataSpace Subscription, работающих с одной БД. При рассогласовании значений данных настроек между разными экземплярами процессы будут брать в работу один и тот же набор данных, что приведет к нарушению порядка обработки событий.

Также запрещено менять данные настройки в runtime.

Служебная таблица T_DSPC_SYS_SUBSCRIPTION_WORKER содержит информацию о партициях в разрезе подписок. Недостающие партиции добавляет служебный процесс на старте сервиса. Процессы EVENT_TRANSFER и EVENT_WORKER в процессе своей работы проверяют таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER на предмет необрабатываемых партиций и берут их в работу, прописывая свой идентификатор в поле PROCESSID. Партиция считается необрабатываемой, если PROCESSID не установлен, либо ее heartbeat не обновлялся определенное время, которое задается в настройках dataspace.subscription.event-transfer.heartbeat-timeout-sec и dataspace.subscription.event-worker.heartbeat-timeout-sec для процессов EVENT_TRANSFER и EVENT_WORKER соответственно.

Набор данных разбивается по партициям при помощи предварительно рассчитанного хеша, который хранится в поле PARTITIONHASH в таблицах событий и очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM. При обработке записей процессы EVENT_TRANSFER и EVENT_WORKER вычисляют партицию, к которой принадлежит запись по формуле PARTITIONHASH mod <количество_партиций>. Если полученная партиция присутствует среди партиций, назначенных процессу, то он выполняет обработку записи.

Таблица T_DSPC_SYS_SUBSCRIPTION_PROCESS также способствует управлению масштабированием процессов. При старте сервиса процессы регистрируются в данной таблице, записывая свой идентификатор в поле OBJECT_ID. В процессе работы процессы пересчитывают значение поля MAXPARTITIONS, которое содержит максимальное количество партиций, обрабатываемых процессом, по следующей формуле: <количество_подписок> * <количество_партиций> / <количество_процессов>. Если процессу назначено больше партиций, чем MAXPARTITIONS, то он освободит нужное количество.

Поскольку EVENT_WORKER может обрабатывать последовательно партиции разных подписок, то при сбое одного из получателей может возникнуть задержка обработки для других подписок. Для того чтобы работоспособные получатели продолжали получать сообщения с неизменной скоростью, реализован специальный тип процессов — ERROR_WORKER.

Алгоритм действия сервиса при возникновении ошибки по одной из партиций подписки:

  1. Все партиции подписки, при обработке которой произошла ошибка, EVENT_WORKER переключает в состояние state = ERROR в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER и перестает их обрабатывать, установив значение processId = null.

  2. Процесс ERROR_WORKER выбирает для обработки партиции с состоянием ERROR или CIRCUIT_BREAKING, устанавливая свой processId для соответствующих партиций.

  3. Процесс ERROR_WORKER обрабатывает полученные партиции по алгоритму, аналогичному EVENT_WORKER.

  4. Если очередное событие было успешно обработано, то ERROR_WORKER должен переключить партицию в состояние state = UNASSIGNED и установить processId = null для того, чтобы ее снова подхватил EVENT_WORKER.

При этом за время восстановления в очереди на отправку могло накопиться значительное число событий, что также может повлиять на скорость отправки по тем подпискам, по которым ошибки отсутствуют. Поэтому перед переключением партиции ERROR_WORKER анализирует размер очереди по партиции и, если он выше порогового значения (dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold), то ERROR_WORKER продолжает обрабатывать партицию, пока размер очереди не станет ниже порогового значения.

Для процессов ERROR_WORKER также предусмотрена возможность масштабирования:

  • За количество процессов, которые будут запущены, отвечает настройка dataspace.subscription.event-worker.error-worker-processes-count.

  • Количество партиций для процессов ERROR_WORKER равно количеству партиций, установленному для процессов EVENT_WORKER (dataspace.subscription.event-worker.partitions-number).

Служебные таблицы#

T_DSPC_SYS_SUBSCRIPTION

Таблица подписок. Данные загружаются из файла subscriptions.xml при старте сервиса.

Поле

Описание

OBJECT_ID

идентификатор подписки, задает пользователь (значение "0" зарезервировано системой)

NAME

имя подписки (для отображения, включая labels мониторинга)

DESCRIPTION

описание подписки

EVENTTYPE

entity name событий, на которые осуществляется подписка

CRITERIA

строковое выражение для фильтрации записей события

QUERY

текст GraphQL-запроса для чтения дополнительных данных

TEMPLATE

спецификация трансформации JSON-данных в JSON-сообщение

TARGET

способ публикации (REST, KAFKA, GRAPHQLSUBS)

CALLBACK

URL для публикации сообщения (для REST) или <наименование_кластера>:<наименование_topic> в случае Kafka

HEADERS

спецификация дополнительных заголовков сообщения

IDEMPOTENCEHEADERNAME

наименование заголовка сообщения, в котором будет передан UUID-ключ идемпотентности

ASYNC

признак использования асинхронной отправки (true/false)

BLOCKING

признак приостановки отправки при первой ошибке (true/false)

MAXRETRYATTEMPTS

количество повторных попыток публикации (актуально только для target == REST)

RETRYDELAYMS

задержка между повторными попытками публикации, в миллисекундах (актуально только для target == REST)

TIMEOUTMS

тайм-аут публикации, в миллисекундах (актуально только для target == REST)

VALIDTILL

время окончания действия подписки

SYS_OWNERID

владелец подписки (ID тенанта)

SYS_LASTCHANGEDATE

дата последнего изменения записи

T_DSPC_SYS_EVENT_QUEUE_ITEM

Таблица очереди обработки событий. Заполняется процессами EVENT_TRANSFER. Обрабатывается процессами EVENT_WORKER.

Поле

Описание

OBJECTID_EVENTID

ссылка на событие (часть первичного ключа)

OBJECTID_SUBSCRIPTIONID

ссылка на подписку (часть первичного ключа)

EVENTTYPE

entity name события

STATUS

статус обработки элемента очереди событий (NEW, SENT, ERROR, SKIP)

AGGREGATEROOTTYPE

тип корня агрегата Event, определяется по метаданным

AGGREGATEROOTID

идентификатор корня агрегата события

PARTITIONHASH

предварительно рассчитанный хеш для деления по партициям

IDEMPOTENCEHEADER

UUID-значение, формируемое при создании публикации и передаваемое в специальном заголовке при публикации сообщения

CREATIONTIMESTAMP

время добавления события в очередь

SYS_LASTCHANGEDATE

дата последнего изменения записи

Значения поля STATUS:

  • NEW — новый элемент очереди событий, еще не был обработан;

  • SENT — элемент очереди событий успешно обработан, сообщение отправлено;

  • ERROR — произошла ошибка при обработке, элемент будет обработан повторно;

  • SKIP — подходящее событие не найдено в процессе обработки элемента очереди событий (скорее всего событие не прошло по условию, указанному в поле criteria подписки).

T_DSPC_SYS_SUBSCRIPTION_WORKER

Поле

Описание

OBJECTID_PARTITIONNUMBER

номер партиции (часть первичного ключа)

OBJECTID_SUBSCRIPTIONID

ссылка на подписку (часть первичного ключа)

HEARTBEAT

время последнего обновления

PROCESSID

идентификатор процесса

STATE

состояние обработки партиции (ACTIVE, CIRCUIT_BREAKING, UNASSIGNED, ERROR)

T_DSPC_SYS_SUBSCRIPTION_PROCESS

Поле

Описание

OBJECT_ID

идентификатор процесса (первичный ключ)

MAXPARTITIONS

максимальное количество партиций, обрабатываемых процессом

KIND

вид процесса (worker/перекладчик) (EVENT_WORKER, EVENT_TRANSFER)

HEARTBEAT

время последнего обновления

Конфигурация#

dataspace.subscription.idempotenceHeaderUuidWithHyphens
#Значение по умолчанию: "true"
#true — UUID для заголовка идемпотентности idempotenceHeaderName формируется с дефисами, в результате его размер равен 36 символам
#false — UUID для заголовка идемпотентности idempotenceHeaderName формируется без дефисов, в результате его размер равен 32 символам

dataspace.subscription.event-transfer.transfer-processes-count
#Значение по умолчанию: 1
#Количество процессов перекладчика (EVENT_TRANSFER)

dataspace.subscription.event-transfer.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы перекладчика

dataspace.subscription.event-transfer.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов перекладчика (EVENT_TRANSFER)

dataspace.subscription.event-transfer.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс перекладчика

dataspace.subscription.event-transfer.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс перекладчика будет считаться неактивным

dataspace.subscription.event-transfer.max-transfer-count
#Значение по умолчанию: 1000
#Количество событий, обрабатываемых перекладчиком за один цикл работы

dataspace.subscription.event-worker.worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER

dataspace.subscription.event-worker.error-worker-processes-count
#Значение по умолчанию: 1
#Количество процессов EVENT_WORKER, обрабатывающего ошибки (ERROR_WORKER)

dataspace.subscription.event-worker.schedule-delay-ms
#Значение по умолчанию: 5 миллисекунд
#Задержка между запусками циклов работы процесса EVENT_WORKER

dataspace.subscription.event-worker.partitions-number
#Значение по умолчанию: 16
#Количество партиций (степень параллелизма) для процессов EVENT_WORKER

dataspace.subscription.event-worker.heartbeat-timeout-sec
#Значение по умолчанию: 5 секунд
#Время с момента последнего обновления heartBeat-партиции в таблице T_DSPC_SYS_SUBSCRIPTION_WORKER, по истечении которого партиция будет считаться необрабатываемой и ее может брать в работу процесс EVENT_WORKER

dataspace.subscription.event-worker.process-alive-timeout-ms
#Значение по умолчанию: 5000 миллисекунд
#Время с момента последнего обновления heartBeat-процесса в таблице T_DSPC_SYS_SUBSCRIPTION_PROCESS, по истечении которого процесс EVENT_WORKER будет считаться неактивным

dataspace.subscription.event-worker.circuit-breaker-error-count-threshold
#Значение по умолчанию: 10
#Количество ошибок по подписке, по достижении которого включается circuit breaker на эту подписку

dataspace.subscription.event-worker.circuit-breaker-timeout-ms
#Значение по умолчанию: 30000 миллисекунд
#Время с момента включения circuit breaker на подписку, по истечении которого circuit breaker будет выключен

dataspace.subscription.event-worker.error-worker-queue-size-switching-threshold
#Значение по умолчанию: 100
#Пороговое значение размера очереди накопившихся сообщений по подписке
#Пока размер очереди выше указанного значения порога, `ERROR_WORKER` не будет переключать партицию обратно на `EVENT_WORKER`

dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker
#Значение по умолчанию: 5
#Максимальное количество потоков (на процесс) в пуле потоков, используемом для асинхронной отправки
#Результирующее максимальное количество потоков равно:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)

dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker
#Значение по умолчанию: 1000
#Максимальное количество элементов в очереди (на процесс) пула потоков, используемом для асинхронной отправки
#Результирующий размер очереди равен:
#dataspace.subscription.event-worker.kafka-async-publisher-pool-queue-size-per-worker * (dataspace.subscription.event-worker.worker-processes-count + dataspace.subscription.event-worker.error-worker-processes-count)


dataspace.subscription.service-tasks.pool-size
#Значение по умолчанию: 5
#Количество потоков в пуле служебных процессов

dataspace.subscription.service-tasks.delete-dead-processes-schedule-delay-ms
#Значение по умолчанию: 10000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который удаляет неактивные процессы EVENT_TRANSFER и
#EVENT_WORKER из таблицы T_DSPC_SYS_SUBSCRIPTION_PROCESS

dataspace.subscription.service-tasks.load-subscriptions-schedule-delay-ms
#Значение по умолчанию: 1000 миллисекунд
#Задержка между запусками циклов работы служебного процесса, который выполняет загрузку подписок из файла subscriptions.xml
#в таблицу T_DSPC_SYS_SUBSCRIPTION, а также вставку недостающих партиций в таблицу T_DSPC_SYS_SUBSCRIPTION_WORKER
#После успешной загрузки процесс перестает запускаться

dataspace.subscription.service-tasks.cleanup-depth-hr
#Значение по умолчанию: 48 часов
#Максимальное время хранения записей в таблицах событий и таблице очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM, после чего
#записи будут удалены фоновым процессом, если они уже были обработаны: status == PROCESSED для событий 
#и status == SENT для записей T_DSPC_SYS_EVENT_QUEUE_ITEM

dataspace.subscription.service-tasks.cleanup-cron
#Значение по умолчанию: 0 0 0 * * * (каждый день в 00:00)
#Расписание запуска служебного процесса, который удаляет отработанные старые записи из таблиц событий и
#таблицы очереди событий T_DSPC_SYS_EVENT_QUEUE_ITEM

dataspace.subscription.service-tasks.tasks-schedule-initial-delay-ms
#Значение по умолчанию: 90000 миллисекунд
#Задержка перед запуском всех фоновых процессов на старте сервиса. Применяется для обеспечения корректной установки health-метрики
#readiness на старте сервиса.

dataspace.subscription.xml.enable-check-on-start
#Значение по умолчанию: true
#Включает/выключает валидацию полей query, criteria, template у подписок, описанных в subscription.xml при старте сервиса
#Только для стендов разработки! При эксплуатации в промышленных условиях должна быть установлена в значение "true"

Если предполагается использование функциональности публикации событий в Kafka, то необходимо задать настройки для каждого кластера Kafka в файле customer-properties.subscription.yaml в следующем виде:

Примечание

В случае использовании Helm-инсталляции в рамках раздела "Ручная установка сервиса" документа "Руководство по установке" необходимо создать kind: Secret в среде контейнеризации с параметрами, указанными ниже, и задать имя полученного секрета в параметр в Values.yml:

  appConfig:
   subscription:
      config: "cm-dspc-subscriptionstub"
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.bootstrap-servers=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.key-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.value-serializer=org.apache.kafka.common.serialization.StringSerializer
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.acks=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.delivery-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.request-timeout-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.retry-backoff-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.linger-ms=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotence=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.max-in-flight-requests-per-connection=

#Если требуется SSL:
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.security-protocol=SSL
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-key-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-keystore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-location=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-truststore-password=
dataspace.subscription.publisher.kafka.config.<наименование_кластера>.ssl-endpoint-identification-algorithm=

При необходимости параметры можно сделать стендозависимыми по аналогии со стендозависимой конфигурацией subscriptions.xml (см. раздел "Стендозависимая конфигурация subscriptions.xml").

Пример customer-properties.subscription.yaml:

kind: ConfigMap
apiVersion: v1
metadata:
  name: customer-properties
data:
  customer.properties: |-
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.bootstrap-servers={{ local.kafka.bootstrap.servers }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.acks={{ local.kafka.acks }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.delivery-timeout-ms={{ local.kafka.delivery.timeout.ms }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.request-timeout-ms=200
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.retry-backoff-ms=500
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.linger-ms={{ local.kafka.linger.ms }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.enable-idempotence={{ local.kafka.enable.idempotence }}
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.max-in-flight-requests-per-connection={{ local.kafka.max.in.flight.requests.per.connection }}

    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.security-protocol=SSL
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-key-password=
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-location=/deployments/credentials/kafkaSslKeystore
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-keystore-password=12456
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-location=/deployments/credentials/kafkaSslTrustwtore
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-truststore-password=54321
    dataspace.subscription.publisher.kafka.config.LOCAL_KAFKA.ssl-endpoint-identification-algorithm=

Пример custom_property.conf.yml:

local.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
local.kafka.acks=all
local.kafka.delivery.timeout.ms=5000
local.kafka.linger.ms=0
local.kafka.enable.idempotence=true
local.kafka.max.in.flight.requests.per.connection=5
# ...

Внимание!

При использовании SSL установка значения "true" для параметра dataspace.subscription.publisher.kafka.config.<наименование_кластера>.enable-idempotence может приводить к ошибке org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed. Для того чтобы этого избежать, необходимо корректным образом настроить операцию IdempotentWrite для кластера. Подробнее см. в официальной документации продукта Kafka.

В файле subscriptions.xml target необходимо задавать в формате <наименование_кластера>:<наименование_topic>.

Пример: LOCAL_KAFKA:test-topic.

Retry#

Параметры retry/timeout для Kafka задаются в настройках Kafka на конкретный кластер <наименование_кластера>. Для этого необходимо использовать параметры delivery-timeout-ms, request-timeout-ms, retry-backoff-ms.

Параметры retry/timeout для REST берутся из параметров подписки: maxRetryAttempts, timeoutMs, retryDelayMs. Для ошибок с http status code 4xx retry не выполняется (то есть retry — только для 5xx и в случае тайм-аута).

Для идемпотентности потребитель может определить необходимый заголовок (передавая какое-либо из полей event с уникальным значением).

Circuit breaking#

На уровне параметра подписки blocking вводится настройка режима обработки ошибки:

  • blocking=true — при первой ошибке запрет на отправку остальных публикаций по партиции подписки.

  • blocking=false — разрешена публикация других событий по подписке до наступления условия приостановки.

Поведение при ошибках:

  1. Производится фиксация ошибки. Статус "ERROR" устанавливается для элемента очереди обработки событий T_DSPC_SYS_EVENT_QUEUE_ITEM.

  2. Если параметр blocking=false, то продолжается обработка следующих сообщений, при этом счетчик ошибок по подписке увеличивается.

  3. Если параметр blocking=true, то полностью приостанавливается отправка по этой партиции, при этом счетчик ошибок по подписке увеличивается.

  4. При накоплении порогового значения ошибок по подписке (настройка dataspace.subscription.event-worker.circuit-breaker-error-count-threshold равна, например, "10") приостанавливается отправка по этой подписке. При этом включается circuit breaker на всю подписку на один экземпляр приложения.

  5. Спустя время, указанное в настройке dataspace.subscription.event-worker.circuit-breaker-timeout-ms, производится попытка повторной отправки, начиная с тех, чья отправка завершилась с ошибкой.

  6. Успешная отправка сбрасывает счетчик ошибок.

Примечание

Каждый процесс EVENT_WORKER самостоятельно принимает решение о приостановке и возобновлении обработки партиции. Для того чтобы не останавливать весь поток при ошибке в данных одной из публикаций приостановится только одна партиция.

Мониторинг#

Мониторинг включается настройкой:

dataspace.subscription.metrics.enabled
#"false" — метрики выключены (значение по умолчанию)
#"true" — метрики включены

Реализованы следующие метрики мониторинга:

Наименование метрики

Тип метрики

Теги

Описание

dspc.subscription.handle.time

Timer

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Время обработки событий в разрезе подписки и результата отправки

dspc.subscription.publication.time

Timer

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Время отправки сообщений в разрезе подписки и результата отправки

dspc.subscription.errors.count

Counter

subscriptionId — идентификатор подписки

Количество ошибок отправки событий в разрезе подписки

dspc.subscription.skip.count

Counter

subscriptionId — идентификатор подписки

Количество отсеянных событий в разрезе подписки

dspc.subscription.transfer.count

Counter

Скорость перекладки событий

dspc.subscription.partition.state

Gauge

subscriptionId — идентификатор подписки, state — значение поля state T_DSPC_SYS_SUBSCRIPTION_WORKER на момент сбора метрики

Статус обработки партиций — количество по T_DSPC_SYS_SUBSCRIPTION_WORKER в разрезе state и subscriptionId

dspc.subscription.event.queue.size

Gauge

subscriptionId — идентификатор подписки, status — значение поля status T_DSPC_SYS_EVENT_QUEUE_ITEM на момент сбора метрики

Количество элементов очереди обработки событий в таблице T_DSPC_SYS_EVENT_QUEUE_ITEM в разрезе подписок и статусов.

dspc.subscription.new.events.count

Gauge

eventType — тип события

Отображает количество событий со статусом NEW каждого типа.

Метрики dspc.subscription.partition.state, dspc.subscription.event.queue.size и dspc.subscription.new.events.count обновляются фоновым scheduled-процессом. Частота обновления данных метрик регулируется настройкой:

dataspace.subscription.metrics.schedule-delay-ms
#Значение по умолчанию — 10000 миллисекунд

API повторной отправки событий#

REST API POST /admin/api/subscriptions/<id>/resend позволяет повторно отправить сообщения для событий, которые были уже обработаны ранее.

Внимание!

Переотправка в topic Kafka допустима только при единственной системе, читающей topic, по инициативе этой системы. Все остальные читатели могут получить ранее обработанные ими данные, в результате чего могут образоваться дубли.

Примечание

Переотправка возможна только для тех событий, по которым присутствуют записи в таблицах событий и очереди публикации (которые не были очищены). Необходимо задавать глубину очистки очередей событий с учетом требований по их возможной переотправке.

Примечание

При наличии в подписке GraphQL-query при повторной отправке он будет выполнен повторно на текущих данных. Как следствие — данные публикации могут отличаться от предыдущего вызова. Если для каких-то данных это критично, то необходимо завести соответствующие атрибуты в самом событии и заполнять их при создании события.

Параметры:

  • id — идентификатор подписки;

  • тело запроса формата json:

    {
        "filter" : "root.fromReference=='2' && root.referenceName=='client'",
        "timeRange" :
           {
             "from" : "2023-11-29T10:39:46.587",
             "to" : "2023-12-01T10:39:46.587"
           }
    }
    
  • filter — фильтр по атрибутам события, написанный на языке строковых выражений;

  • timeRange.from, timeRange.to — временной диапазон (учитывается поле creationTimestamp сущности события) для выбора событий, сообщения по которым будут отправлены повторно.

Требования к запросу:

  • Один из параметров filter или timeRange обязательно должен быть заполнен, иначе API вернет ошибку 405 Method Not Allowed с сообщением "Не задано ни одного условия отбора событий".

  • Если задан параметр timeRange, то to и from обязательны к заполнению. Если to или from не заполнены, то API вернет ошибку 400 Bad Request с сообщением "Поле timeRange#from обязательно для заполнения" или "Поле timeRange#to обязательно для заполнения".

При успешном вызове API возвращает код 200 OK и количество событий, по которым были повторно отправлены сообщения. Если не было найдено не одного события, подходящего под условие выборки, сформированного на основе переданных параметров, то API вернет ошибку 404 Not Found c сообщением "Не найдено подходящих событий для переотправки".

Пример вызова (на основе subscriptions.xml, описанного выше):

curl -d @data.json -H "Content-Type: application/json" -X POST http://localhost:8090/admin/api/subscriptions/mergeSubscription1/resend

data.json:

{
    "filter" : "root.$id $in ['7294634218387406849','7294634226977341441'] && root.fromReference=='2' && root.referenceName=='client'",
    "timeRange" :
       {
         "from" : "2023-11-29T10:39:46.587",
         "to" : "2023-12-01T10:39:46.587"
       }
}

Пример ответа:

5

Внимание!

Если параметры подписки в subscriptions.xml были изменены после первичной публикации события, то при повторной отправке:

  • сообщение будет направлено на новый url/топик, если были изменены поля target и/или callback;

  • формат сообщения изменится, если было изменено поле template;

  • данные сообщения изменятся, если было изменено поле query;

  • сообщение не будет отправлено, если было изменено поле criteria и событие теперь не подходит под новое условие;

  • сообщение будет содержать новый состав заголовков, если было изменено поле headers.

Если на момент первичной публикации события для подписки не был задан атрибут idempotenceHeaderName, то и при повторной отправке соответствующий заголовок передан не будет, даже если к тому моменту атрибут idempotenceHeaderName будет добавлен в подписку.

Клиентский путь#

Порядок действий:

  1. Определить список событий для публикации.

  2. Добавить в проекте DataSpace в модель данных model.xml соответствующие этим событиям Event с необходимым для публикации набором атрибутов.

  3. Создать в проекте DataSpace конфигурацию подписок subscriptions.xml, предусмотрев необходимые параметры для стендозависимой параметризации.

  4. В конфигурации подписок описать шаблон трансформации данных события (и результата выполнения query) в JSON для публикации.

  5. Сформировать файлы customer-properties.subscription.yaml и custom_property.conf.yml, тем самым определив стендозависимую конфигурацию.

  6. Выполнить сборку проекта DataSpace.

  7. Доработать приложение для создания событий в необходимых узлах бизнес-процесса (вместе с изменением бизнес-данных или независимо).

  8. Подготовить интеграционное окружение для публикации: кластер Kafka, topics, манифесты Kubernetes и Istio для сетевых взаимодействий, конфигурации Secret Management System для получения и ротации сертификатов и другое.

  9. Развернуть DataSpace и приложение, протестировать интеграцию.

  10. Провести нагрузочное тестирование, по результатам уточнить параметры масштабирования сервиса.

  11. Выпустить релиз приложения совместно с DataSpace.

Пример передачи в "Каталог продуктов" сведений об изменении клиента по продукту при слиянии/разъединении#

Входное состояние: приложение развернуто с DataSpace, настроено слияние/разъединение Master Data Management.

Порядок действий:

  1. Создать модель с событием слияния. model.xml:

    <model name="rko-product" version="0.1">
        <external-types>
            <external-type type="ClientEPK" merge-kind="Organization"/>
        </external-types>
    
        <class name="RKOProduct">
            <property name="curISO" type="String" length="3"/>
            <property name="contractNumber" type="String" length="64"/>
            <reference name="client" type="ClientEPK" description="По этой ссылке выполняется слияние"/>
        </class>
        <!-- добавляем в модель событие слияния/разъединения client в RKOProduct -->
        <event name="RKOMergeEvent" merge-event="true">
            <property name="rko" type="RKOProduct" parent="true"/>
        </event>
    </model>
    
  2. Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:

    <subscriptions xmlns="DataspaceSubscriptions">
        <subscription id="mergeRKOForPPK"
                      name="MergeRKOForPPK"
                      description="Событие о слиянии клиента продукта для каталога продуктов"
                      target="KAFKA"
                      eventType="RKOMergeEvent"
                      callback="PPK_KAFKA:${rko.merge.subscription.1.topic}"
                      validTill="9999-12-31T23:59:59.999Z"
                      async="false"
                      blocking="true">
            <criteria>
                coalesce(root.mergeFailed, false) != true
            </criteria>
            <query>
                {
                searchRKOProduct(cond: "it.$id==''${rko}''") {
                    elems {
                            id
                            contractNumber
                            curISO
                            client {
                                entityId 
                            }
                        }
                    }
                }
            </query>
            <!-- структура данных на входе преобразования -->
            <!--
            {
              "event": {
                "objectId": "someEventId11111 ",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "RKOMergeEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId01",
                "rko": "1231415534646745",
                "aggregateRootId": "1231415534646745",
                "mergeKind": "Organization",
                "referenceName": "client",
                "fromReference": "1999449494944077",
                "toReference": "1999449494944942"
              },
              "data": {
                "searchRKOProduct": {
                    "elems": {
                        "id": "1231415534646745",
                        "contractNumber": "123141553464",
                        "curISO": "RUB",
                        "client": "1999449494944942"
                    }
                }
              }
            }
            -->
            <!-- Ожидаемый результат -->
            <!--
            {
              "Contract": 
              {    
                "ProductCode": "RKO",    
                "ContractID": "1231415534646745",    
                "epkOrgId": "1999449494944942",    
                "ContractNumber": "123141553464",    
                "CurrencyIso": "RUB"
              }
            }
             В примере преобразования в качестве ProductCode подставляется константа "RKO"
            -->
            <template>
            [
                {
                    "operation": "shift",
                    "spec": {
                        "event": {
                            "rko": "Contract.ContractID",
                            "toReference": "Contract.epkOrgId"
                        },
                        "data": {
                            "searchRKOProduct": {
                                "elems": {
                                    "*": {
                                        "contractNumber": "Contract.ContractNumber",
                                        "curISO": "Contract.CurrencyIso"
                                    }
                                }
                            }
                        }
                    }
                },
                {
                    "operation": "default",
                    "spec": {
                        "Contract": {
                            "ProductCode": "RKO"
                        }
                    }
                }
            ]
         </template>
         <headers>
             -XTenantId=${rko.merge.subscription.1.tenantId}
         </headers>
     </subscription>
    </subscriptions>
    
  3. Настроить подключение к Kafka для публикации событий.

  4. Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: customer-properties
    data:
      customer.properties: |-
        rko.merge.subscription.1.topic={{ mergeRKOForPPK.topic.value }}
        rko.merge.subscription.1.tenantId={{ mergeRKOForPPK.tenantId.value }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.bootstrap-servers={{ mergeRKOForPPK.kafka.bootstrap.servers }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.key-serializer=org.apache.kafka.common.serialization.StringSerializer
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.value-serializer=org.apache.kafka.common.serialization.StringSerializer
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.acks={{ mergeRKOForPPK.kafka.acks }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.delivery-timeout-ms={{ mergeRKOForPPK.kafka.delivery.timeout.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.request-timeout-ms={{ mergeRKOForPPK.kafka.request.timeout.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.retry-backoff-ms={{ mergeRKOForPPK.kafka.retry.backoff.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.linger-ms={{ mergeRKOForPPK.kafka.linger.ms }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.enable-idempotence={{ mergeRKOForPPK.kafka.enable.idempotence }}
        dataspace.subscription.publisher.kafka.config.PPK_KAFKA.max-in-flight-requests-per-connection={{ mergeRKOForPPK.kafka.max.in.flight.requests.per.connection }}
    

    custom_property.conf.yml:

    mergeRKOForPPK.topic.value=topic-name
    mergeRKOForPPK.tenantId.value=tenantId
    mergeRKOForPPK.kafka.bootstrap.servers=<список_адресов_брокеров_Kafka>
    mergeRKOForPPK.kafka.acks=all
    mergeRKOForPPK.kafka.delivery.timeout.ms=5000
    mergeRKOForPPK.kafka.request.timeout.ms=200
    mergeRKOForPPK.kafka.retry.backoff.ms=500
    mergeRKOForPPK.kafka.linger.ms=0
    mergeRKOForPPK.kafka.enable.idempotence=true
    mergeRKOForPPK.kafka.max.in.flight.requests.per.connection=5
    # ...
    
  5. Собрать и развернуть приложение.

  6. Выполнить слияние.

Пример уведомления о состоянии обработки заявки#

Порядок действий:

  1. Создать модель с событием изменения статуса заявки. model.xml:

    <model name="rko-product" version="0.2">   
      <external-types>
        <external-type type="ClientEPK" merge-kind="Individual"/>
      </external-types>
    
      <enum name="ApplicationStatus">
        <value name="DRAFT"/>
        <value name="SUBMITTED"/>
        <value name="APPROVED"/>
        <value name="REJECTED"/>
        <value name="CANCELLED"/>
      </enum>
      <class name="Application">
         <property name="code" type="String" length="32"/>
         <property name="name" type="String" length="254"/>
         <property name="ref" type="String" length="64" unique="true"/>
         <reference name="client" type="ClientEPK" description="По этому референсу выполняется слияние/разъединение"/>
         <property name="content" type="Text"/>
         <property name="applicationStatus" type="String" length="32"/>
      </class>   
      <!-- добавляем в модель событие изменения статуса заявки -->
      <event name="StatusChangeEvent">
         <property name="application" type="Application" parent="true" />
         <property name="reason" type="String" length="2048" description="Причина изменения статуса"/>
         <property name="eventUser" type="String" length="254" description="Идентификатор пользователя, действия которого привели к изменению статуса" />
      </event>
    </model>
    
  2. Создать файл с конфигурацией подписки и поместить его в проект рядом с model.xml:

    <subscriptions xmlns="DataspaceSubscriptions">
        <subscription id="applicationStatusNotify"
                        name="applicationStatusNotify"
                        description="Событие об изменении статуса заявки"
                        target="REST"
                        eventType="StatusChangeEvent"
                        callback="${baseWebHookUrl}/api/v1/statusNotify"
                        validTill="9999-12-31T23:59:59.999Z"
                        maxRetryAttempts="${applications.subscription.1.nretry}"
                        timeoutMs="${applications.subscription.1.timeout}"
                        retryDelayMs="${applications.subscription.1.retrydelay}"
                        async="true" blocking="true">
            <query>
            {
             searchApplication(cond: "it.$id=='${application}'") {
              elems {
                    id
                    code
                    name
                    ref
                    applicationStatus
              }
             }
            }  
            </query>
            <!-- структура данных на входе преобразования -->
            <!--
            {
              "event": {
                "objectId": "someEventId555",
                "creationTimestamp": "2023-04-01T22:22:22.001Z",
                "type": "StatusChangeEvent",
                "status": "processed",
                "lastChangeDate": "2023-04-01T22:22:23.551Z",
                "ownerId": "someOwnerId02",
                "application": "1231415534646745",
                "aggregateRootId": "1231415534646745",
                "reason": "Application successfully passed all checks and has been approved",
                "eventUser": "xxx-uuuuuuu-zzz"
              },
              "data": {
                "searchApplication": {
                    "elems": {
                        "id": "1231415534646745",
                        "code": "APP-X333",
                        "name": "Application for grant",
                        "ref": "APP-X333-113333332",
                        "applicationStatus": "APPROVED"
                    }
                }
              }
            }
            -->
            <!-- Ожидаемый результат -->
            <!--
            {
                "ApplicationReference": "APP-X333-113333332",    
                "ApplicationCode": "APP-X333",    
                "ApplicationName": "Application for grant",    
                "Status": "APPROVED",    
                "Reason": "Application successfully passed all checks and has been approved",
                "Timestamp": "2023-04-01T22:22:22.001Z"
            }
            -->
            <template>
            [
              {
                "operation": "shift",
                "spec": {
                    "event": {
                        "reason": "Reason"
                        "creationTimeStamp": "Timestamp"
                    },
                    "data": {
                        "searchApplication": {
                            "elems": {
                                "*": {
                                    "code": "ApplicationCode",
                                    "name": "ApplicationName",
                                    "ref": "ApplicationReference",
                                    "applicationStatus": "Status"
                                }
                            }
                        }
                    }
                 }
              }
            ]
            </template>
            <headers>
              -XTenantId=${applications.subscription.1.tenantId}
              -XchangeUser=${eventUser}
            </headers>
        </subscription>
    </subscriptions>
    
  3. Сконфигурировать подключение к REST API для публикации.

  4. Определить стендозависимую конфигурацию. customer-properties.subscription.yaml:

    kind: ConfigMap
    apiVersion: v1
    metadata:
      name: customer-properties
    data:
      customer.properties: |-
        baseWebHookUrl={{ applicationStatusNotify.baseWebHookUrl.value }}
        applications.subscription.1.nretry=1
        applications.subscription.1.timeout=10000
        applications.subscription.1.retrydelay={{ applicationStatusNotify.retrydelay.value }}
        applications.subscription.1.tenantId={{ applicationStatusNotify.tenantId.value }}
    

    custom_property.conf.yml:

    applicationStatusNotify.baseWebHookUrl.value=http://localhost:8082
    applicationStatusNotify.retrydelay.value=2000
    applicationStatusNotify.tenantId.value=tenantId
    # ...
    
  5. Собрать DataSpace.

  6. Задать стендозависимые параметры.

  7. Развернуть приложение.

  8. Вызвать /packet API DataSpace с командой create для Event и командой установки applicationStatus в "APPROVED".

    {
      "jsonrpc": "2.0",
      "method": "execute",
      "id": 1,
      "params": {
        "packet": {
          "commands": [
            {
              "id": "updateApplication",
              "name": "update",
              "params": {
                "type": "Application",
                "id": "<идентификатор_заявки>",
                "applicationStatus": "APPROVED"
              }
            },
            {
              "id": "createStatusChangeEvent",
              "name": "create",
              "params": {
                "type": "StatusChangeEvent",
                "application": "<идентификатор_заявки>",
                "reason": "заявка обработана",
                "eventUser": "<идентификатор_пользователя>"
              }
            }
          ]
        }
      }
    }