Агрегация событий#
Шаг предназначен для агрегации нескольких событий в одно с помощью модуля declarative-mapper.
Представляет собой объект с полем type со значением "aggregation" и полями:
name— имя шага;environmentVariables— загрузка переменных из файла;format— описание форматов входящих и исходящих событий:input— формат входящих событий, аналогично Преобразование форматов событий;output— формат исходящих событий;
key— описание вычисление ключа события;trigger— описание триггера агрегации событий:объект с полем
typeсо значениями:count— триггер подсчета заданного количества событий, в отдельном полеcountзадается число агрегируемых событий;dsl— триггер с логикой работы на DSL, содержит поля аналогичные полюdsl;time— триггер срабатывания по времени, срабатывает раз в сутки в заданное время;timeFromEvent— триггер срабатывания по времени из события, срабатывает раз в сутки согласно заданному времени, полученному из события при помощи DSL;
также содержит поле
timeoutсо значением тайм-аута в миллисекундах после получения последнего события;
dsl— описание расположения правил агрегации;stateFull— сохранениеEnvironment.cacheв состоянииFlink, логическое, по умолчаниюfalse;destination— следующий шаг потока;error— шаг обработки в случае возникновения ошибки;shouldLogDebug- флаг для вывода информации о результатах выполнения шагов dsl (при триггере с логикой работы на dsl);storage- настройки хранилища агрегации, отличающиеся от заданных в reactiveConfig (реализован только для компонента EVPC).
Значения полей dsl и environmentVariables представляет собой объект с полями:
source— тип источника правил, принимает значения:file— правила расположены в файловой системе;classpath— правила расположены в classpath JVM.
path— путь до файла на диске или в classpath JVM.
Значения полей input и output могут иметь значения:
json— для событий в форматеJSON;xml— для событий в форматеXML;событие в формате
csv:type— со значениемcsv;columnSeparator— разделитель полей события;fieldNames— список имен полей события для использования при трансформации, также определяет порядок полей.
объект с полями:
type— со значениемavro;schema— список схем, значение аналогично полюdsl, объект имеющий логическое полеdefaultсо значениемtrue, будет считаться схемой по умолчанию.
Пример агрегации событий#
{
source: {
name: "testInput"
type: "test"
topic: "input"
config: ${defaults.kafka}
destination: ${aggregationStep}
}
deadletter: {
name: "deadletter"
type: "destination"
topic: "deadletter"
config: ${defaults.kafka}
}
destination: {
name: "output"
type: "destination"
topic: "output"
config: ${defaults.kafka}
}
aggregationStep: {
name: "dsl-aggregation"
type: "aggregation"
format: {
input: "json"
output: "json"
}
key {
type: "dsl"
path: "key.tr"
}
trigger: {
type: "dsl"
path: "trigger.tr"
timeout: 100
}
dsl: {
path: "simple.tr"
}
destination: ${destination}
error: ${deadletter}
}
flow: {
name: "Event Process test"
source: [${source}]
}
}
Avro-схема выбирается исходя из значения заголовка "schemaName" в сообщении Apache Kafka. Если заголовок не задан,
то используется схема по умолчанию или первая в списке в случае, если никакая схема не помечена полем default со значением true.
Логика потока агрегации событий#
Поток агрегации состоит из 3 частей:
Вычисление ключа события и сортировка потока событий по ключу;
Временное окно и триггер отправки событий в функцию агрегации;
Функция агрегации событий.
Вычисление ключа событий#
Логика вычисления ключа для события задается параметром key, объект с полем type которое принимает следующие значения:
key— вычисление ключа по значению ключа из сообщения Apache Kafka, представляет собой объект с полемtypeравнымkey;headers— вычисление ключа по определенным значениям заголовков, представляет собой объект с полемtypeравнымheadersи в полеkeysсодержит массив заголовков, из которых должен формироваться ключ. Значение ключа является объединением значений заголовков с разделителем|.hash— вычисление ключа хеш функцией из тела сообщения. Представляет собой объект с полемtypeравнымhash, опционально в полеalgorithmсодержит имя хеш алгоритма, по умолчаниюSHA-256.dsl— вычисление ключа с помощью правил трансформации, объект содержит поля:typeсо значением"dsl";source— источник правил трансформацииclasspathилиfile;path— путь до правил трансформации.
При вычислении ключа через DSL правил трансформации:
Вход
INсодержит событие;Environment.SourceTopicсодержит имя топика, откуда было прочитано сообщение;Environment.SourceKeyсодержит значение ключа из сообщения Apache Kafka в виде строки;Environment.kafka.headersсодержит словарь заголовков из сообщения Apache Kafka.
При вычислении ключа с типом dsl правила трансформации должны завершаться вызовом функции return(<key>), где <key> — значение ключа.
Выходы OUT и HEADERS скрипта трансформации игнорируются.
Окно агрегации событий#
После вычисления ключа событие попадает в окно агрегации. В универсальном обработчике используется глобальное окно агрегации.
Иллюстрация сессионного окна из документации Apache Flink.#

В нашем случае параметр timeout у триггера задает session gap, то есть максимальное время между получением событий с одним ключом.
В момент возникновения тайм-аута все собранные события в окне передаются в функцию агрегации, а окно агрегации очищается.
Для каждого нового события создается свое окно агрегации, затем ищется существующее окно для событий с таким же ключом. Если окно было создано ранее, то старое окно и новое объединяются. При этом начало нового окна становится началом первого окна, а конец берется из второго окна.
Окна до объединения#

Окна после объединения#

В процессе объединения окон происходит объединение состояний, после чего вызывается триггер.
Триггер отправки событий в функцию агрегации#
Предусмотрено 4 вида триггера:
Триггер подсчета количества событий в окне агрегации, срабатывает при накоплении заданного числа событий или при наступлении тайм-аута относительно получения первого события.
Триггер с логикой на DSL правил трансформации событий, срабатывает согласно заданным в DSL правилам или при наступлении тайм-аута относительно получения первого события.
Триггер срабатывания по времени, срабатывает раз в сутки в заданное время.
Триггер срабатывания по времени из события, срабатывает раз в сутки согласно заданному времени, полученному из события при помощи DSL.
Настройка триггера подсчета количества событий#
Задается объектом в поле trigger шага агрегации с полями:
type— со значением"count";count— целое число, равное количеству ожидаемых событий. Триггер срабатывает в момент достижения количества событий этого значения;timeout— время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по тайм-ауту;shouldFireAtProcessingTime— должен ли срабатывать триггер по тайм-ауту относительно получения первого события, значение по умолчаниюtrue.
После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.
trigger: {
type: "count"
count: 2
timeout: 100
}
Настройка триггера с логикой на DSL#
Задается объектом в поле trigger шага агрегации с полями:
type— со значением"dsl";source— источник правил трансформации, может принимать значения"classpath"и"file", по умолчанию"classpath";path— путь до файла с правилами;timeout— время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по тайм-ауту;shouldFireAtProcessingTime— должен ли срабатывать триггер по тайм-ауту относительно получения первого события, значение по умолчаниюtrue.
trigger: {
type: "dsl"
source: "file"
path: "script.tr"
timeout: 100
}
После срабатывания триггера по тайм-ауту происходит передача накопленных событий в функцию агрегации и очистка окна.
При добавлении нового события в окно агрегации происходит выполнение DSL правил трансформации:
Вход
INсодержит новое событие;Environment.SourceTopicсодержит имя топиков, откуда было прочитано сообщение;Environment.SourceKeyсодержит значение ключа из сообщения Apache Kafka в виде строки;Environment.cacheсодержит элементы, сохраняющиеся между вызовами триггера;Environment.TriggerFireMsсодержит время вызова триггера в миллисекундах в формате Unix;Environment.kafka.headersсодержит словарь заголовков из сообщения Apache Kafka.
DSL скрипт правил трансформации должен вернуть функцией return() один из следующих статусов:
return("continue")— продолжение накопления событий в окне агрегации;return("fire")— передача накопленных событий в функцию агрегации без очистки и закрытия окна;return("purge")— очистка и закрытие окна без передачи накопленных событий в функцию агрегации;return("fireAndPurge")— передача накопленных событий в функцию агрегации с очисткой и закрытием окна.
Выходы OUT и HEADERS в триггере игнорируются.
Если функция return() не была вызвана или вызвана с другим значением, то накопленные события будут переданы в функцию агрегации, а окно очищено и закрыто.
Настройка триггера срабатывания по времени#
Задается объектом в поле trigger шага агрегации с полями:
typeсо значением"time";time— время, равное ожидаемому времени срабатывания, триггер срабатывает в момент достижения текущего времени этого значения;timePattern— формат времени;maxEmptyIntervals— количество пустых интервалов до сброса ключей агрегации.
После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.
trigger: {
type: "time"
time: "11:05:00+03:00"
timePattern: "HH:mm:ssXXX"
maxEmptyIntervals: 3
}
Настройка триггера срабатывания по времени из события#
Задается объектом в поле trigger шага агрегации с полями:
typeсо значением"timeFromEvent";source— источник правил трансформации, может принимать значения"classpath"и"file", по умолчанию"classpath";path— путь до файла правил трансформации с логикой срабатывания триггера по времени из события на DSL;timePattern— формат времени;maxEmptyIntervals— количество пустых интервалов до сброса ключей агрегации.
После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.
trigger: {
type: "timeFromEvent"
path: "time.tr"
timePattern: "HH:mm:ss"
maxEmptyIntervals: 3
}