Выполнение правил трансформации события#

Описание#

Правила трансформации событий описываются в файле .tr. По умолчанию обработчик не производит никаких действий над потоком, все необходимые действия должны указываться явно.

Правила трансформации представляют собой набор текстовых инструкций, которые описывают, каким образом входной формат должен трансформироваться в выходной.

При выполнении правил трансформации события используются идентификаторы для входа и выхода. Описываются зарезервированными словами:

  1. INPUT, Input, input — для адресации элемента из входящего сообщения;

  2. OUTPUT, Output, output — для адресации элемента из выходящего сообщения.

Или

  1. IN, In, in — для адресации элемента из входящего сообщения;

  2. OUT, Out, out — для адресации элемента из выходящего сообщения.

Вход и выход имеют одинаковую структуру:

  1. headers — заголовки сообщения;

  2. key — ключ сообщения Kafka;

  3. body — тело сообщения;

  4. sourceTopic — topic источника;

  5. targetTopic — topic назначения;

  6. branch — имя ветки для маршрутизации.

Так же в этих корнях могут быть массивы элементов в случае нескольких сообщений на вход или выход.

При использовании INPUT и OUTPUT после (возможно, после индекса) обязательно должен быть указан тип узла:

  • headers;

  • key;

  • body;

  • sourceTopic;

  • targetTopic;

  • branch.

Примеры использования#

  1. Пример правил трансформации:

out = in
out.branch = "second"
headers[>].TargetTopic = "output"
out[>].first = in
headers[+>].BranchName = "transformSecond"
out[+>].second = in
headers[+>].BranchName = "transformThird"
out[+>].third = in
  1. Пример правил трансформации с указанием типа узла:

output.body = in
output.body.branch = "second"
output[>].targetTopic = "output"
output[>].body.first = in
output[+>].branch = "transformSecond"
output[>].body.second = in
output[+>].branch = "transformThird"
output[>].body.third = in
OUTPUT.body.name.var = INPUT.body.text

Использование правил трансформации при обратной совместимости#

Для обратной совместимости применимы старые методы обращения к входам и выходам.

При трансформации события происходит выполнение DSL правил трансформации:

  1. Вход IN содержит входящее событие;

  2. Environment.SourceTopic содержит имя топиков, откуда было прочитано сообщение;

  3. Environment.SourceKey содержит значение ключа из сообщения Apache Kafka в виде строки;

  4. Environment.kafka.headers содержит словарь заголовков из сообщения Apache Kafka;

  5. Environment.cache содержит элементы, сохраняющиеся между вызовами правил трансформации;

  6. Environment.loadedVariables содержит элементы, загруженные из JSON файла из параметра environmentVariables шага трансформации событий.

Выход OUT должен содержать исходящее событие или массив исходящих событий. Выход HEADERS может содержать следующие поля:

  1. TargetTopic имя топиков назначения события;

  2. TargetKey значение ключа в сообщении Apache Kafka;

  3. kafka словарь заголовков сообщения Apache Kafka, не поддерживает вложенные словари;

  4. BranchName имя следующего шага из побочной ветки потока.

Если исходящих событий больше одного, то выход HEADERS должен содержать массив элементов с перечисленными выше полями. Порядок элементов должен соответствовать порядку событий в выходе OUT.

При работе с INPUT и OUTPUT вместо выхода HEADERS нужно использовать соответствующие значения узлов INPUT и OUTPUT.

Поддерживается вызов функции:

  • return("decline"), который прерывает функцию трансформации и фильтрует полученное событие;

  • return("passthrough"), который передает входящие сообщения на следующий шаг без изменений (имеется возможность изменять заголовки и другую информацию при помощи DSL).

Вызов функции return() с любым другим параметром прервет дальнейшее выполнение правил трансформации, при этом уже заполненные выходы OUT и HEADERS будут обработаны и преобразованы в исходящие события.

Функция агрегации событий#

Правила функции агрегации событий аналогичны правилам трансформации события, только функция агрегации принимает на вход массив событий, выполняет правила трансформации и на выход возвращает одно или несколько событий в зависимости от логики правил трансформации.

При выполнении DSL правил трансформации:

  1. Вход IN содержит события из окна агрегации;

  2. Environment.sources содержит массив переменных окружения для каждого события, индекс элементов соответствует индексам событий из IN. Каждый элемент массива содержит поля:

    • SourceTopic — содержит имя топиков, откуда было прочитано сообщение;

    • SourceKey — содержит значение ключа из сообщения Apache Kafka в виде строки;

    • kafka.headers — содержит словарь заголовков из сообщения Apache Kafka;

  3. Environment.cache — содержит элементы, сохраняющиеся между вызовами функции агрегации;

  4. Environment.loadedVariables — содержит элементы, загруженные из JSON файла из параметра environmentVariables шага агрегации;

  5. Environment.AggregationKey — содержит значение ключа агрегации;

  6. Environment.AggregationWindowTimeMs — содержит продолжительность существования окна агрегации в миллисекундах;

  7. Environment.AggregationEventCount — содержит количество событий, переданных в функцию агрегации.