Агрегация событий#

Шаг предназначен для агрегации нескольких событий в одно с помощью модуля declarative-mapper.

Представляет собой объект с полем type со значением "aggregation" и полями:

  1. name — имя шага;

  2. environmentVariables — загрузка переменных из файла;

  3. format — описание форматов входящих и исходящих событий:

  4. key — описание вычисление ключа события;

  5. trigger — описание триггера агрегации событий, объект с полем type:

    • count — триггер подсчета заданного количества событий, в отдельном поле count задается число агрегируемых событий;

    • dsl — триггер с логикой работы на DSL, содержит поля аналогичные полю dsl; Также содержит поле timeout со значением таймаута в миллисекундах после получения последнего события:

  6. dsl — описание расположения правил агрегации;

  7. stateFull — сохранение Environment.cache в состоянии Flink, логическое, по умолчанию false;

  8. destination — следующий шаг потока;

  9. error — шаг обработки в случае возникновения ошибки.

Значения полей dsl и environmentVariables представляет собой объект с полями:

  1. source — тип источника правил, принимает значения:

    • file — правила расположены в файловой системе;

    • classpath — правила расположены в classpath JVM.

  2. path — путь до файла на диске или в classpath JVM.

Значения полей input и output могут иметь значения:

  1. json — для событий в формате JSON;

  2. xml — для событий в формате XML;

  3. событие в формате csv:

    • type — со значением csv;

    • columnSeparator — разделитель полей события;

    • fieldNames — список имен полей события для использования при трансформации, также определяет порядок полей.

  4. объект с полями:

    • type — со значением avro;

    • schema — список схем, значение аналогично полю dsl, объект имеющий логическое поле default со значением true, будет считаться схемой по умолчанию.

Пример агрегации событий#

{
  name: "aggregation"
  type: "aggregation"
  format: {
    input: "json"
    output: {
               type: "avro"
               schema: [{
                 source: "file"
                 path: "/file/path/to/avro/schema"
               },{
                 source: "classpath"
                 path: "/path/to/default/avro/schema"
                 default: true
               }]
             }
  }
  key {
    type: "dsl"
    path: "testDsl/aggregation/key.tr"
  }
  trigger: {
    type: "dsl"
    path: "testDsl/aggregation/trigger.tr"
    timeout: 100
  }
  dsl: {
    path: "testDsl/aggregation/simple.tr"
  }
  destination: {
    name: "output"
    type: "destination"
    topic: "output"
    config: {
      type: "kafka"
      name: "output"
    }
  }
  error: {
    name: "deadletter"
    type: "destination"
    topic: "deadletter"
    config: {
      type: "kafka"
      name: "output"
    }
  }
}

Avro-схема выбирается исходя из значения заголовка "schemaName" в сообщении Apache Kafka. Если заголовок не задан, то используется схема по умолчанию или первая в списке в случае, если никакая схема не помечена полем default со значением true.

Логика потока агрегации событий#

Поток агрегации состоит из 3 частей:

  1. Вычисление ключа события и сортировка потока событий по ключу;

  2. Временное окно и триггер отправки событий в функцию агрегации;

  3. Функция агрегации событий.

Вычисление ключа событий#

Логика вычисления ключа для события задается параметром key, объект с полем type которое принимает следующие значения:

  1. key — вычисление ключа по значению ключа из сообщения Apache Kafka, представляет собой объект с полем type равным key;

  2. headers — вычисление ключа по определенным значениям заголовков, представляет собой объект с полем type равным headers и в поле keys содержит массив заголовков, из которых должен формироваться ключ. Значение ключа является объединением значений заголовков с разделителем |.

  3. hash — вычисление ключа хеш функцией из тела сообщения. Представляет собой объект с полем type равным hash, опционально в поле algorithm содержит имя хеш алгоритма, по умолчанию SHA-256.

  4. dsl — вычисление ключа с помощью правил трансформации, объект содержит поля:

    • type со значением "dsl";

    • source — источник правил трансформации classpath или file;

    • path — путь до правил трансформации.

При вычислении ключа через DSL правил трансформации:

  1. Вход IN содержит событие;

  2. Environment.SourceTopic содержит имя топика, откуда было прочитано сообщение;

  3. Environment.SourceKey содержит значение ключа из сообщения Apache Kafka в виде строки;

  4. Environment.kafka.headers содержит словарь заголовков из сообщения Apache Kafka.

При вычислении ключа с типом dsl правила трансформации должны завершаться вызовом функции return(<key>), где <key> — значение ключа. Выходы OUT и HEADERS скрипта трансформации игнорируются.

Окно агрегации событий#

После вычисления ключа событие попадает в окно агрегации. В универсальном обработчике используется глобальное окно агрегации.

Session Window

В нашем случае параметр timeout у триггера задает session gap, то есть максимальное время между получением событий с одним ключом. В момент возникновения таймаута все собранные события в окне передаются в функцию агрегации, а окно агрегации очищается.

Для каждого нового события создается свое окно агрегации, затем ищется существующее окно для событий с таким же ключом. Если окно было создано ранее, то старое окно и новое объединяются. При этом начало нового окна становится началом первого окна, а конец берется из второго окна.

Окна до объединения#

Before merge

Окна после объединения#

After merge

В процессе объединения окон происходит объединение состояний, после чего вызывается триггер.

Триггер отправки событий в функцию агрегации#

Предусмотрено 4 вида триггера:

  1. Триггер подсчета количества событий в окне агрегации, срабатывает при накоплении заданного числа событий или при наступлении таймаута относительно получения первого события.

  2. Триггер с логикой на DSL правил трансформации событий, срабатывает согласно заданным в DSL правилам или при наступлении таймаута относительно получения первого события.

  3. Триггер срабатывания по времени, срабатывает раз в сутки в заданное время.

  4. Триггер срабатывания по времени из события, срабатывает раз в сутки согласно заданному времени, полученному из события при помощи DSL.

Настройка триггера подсчета количества событий#

Задается объектом в поле trigger шага агрегации с полями:

  1. type — со значением "count";

  2. count — целое число, равное количеству ожидаемых событий. Триггер срабатывает в момент достижения количества событий этого значения;

  3. timeout — время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по таймауту;

  4. shouldFireAtProcessingTime — должен ли срабатывать триггер по таймауту относительно получения первого события, значение по умолчанию true.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
   type: "count"
   count: 2
   timeout: 100
 }

Настройка триггера с логикой на DSL#

Задается объектом в поле trigger шага агрегации с полями:

  1. type — со значением "dsl";

  2. source — источник правил трансформации, может принимать значения "classpath" и "file", по умолчанию "classpath";

  3. path — путь до файла с правилами;

  4. timeout — время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по таймауту;

  5. shouldFireAtProcessingTime — должен ли срабатывать триггер по таймауту относительно получения первого события, значение по умолчанию true.

trigger: {
    type: "dsl"
    source: "file"
    path: "/path/to/dsl/script.tr"
    timeout: 100
  }

После срабатывания триггера по таймауту происходит передача накопленных событий в функцию агрегации и очистка окна.

При добавлении нового события в окно агрегации происходит выполнение DSL правил трансформации:

  1. Вход IN содержит новое событие;

  2. Environment.SourceTopic содержит имя топика, откуда было прочитано сообщение;

  3. Environment.SourceKey содержит значение ключа из сообщения Apache Kafka в виде строки;

  4. Environment.cache содержит элементы, сохраняющиеся между вызовами триггера;

  5. Environment.TriggerFireMs содержит время вызова триггера в миллисекундах в формате Unix;

  6. Environment.kafka.headers содержит словарь заголовков из сообщения Apache Kafka.

DSL скрипт правил трансформации должен вернуть функцией return() один из следующих статусов:

  1. return("continue") — продолжение накопления событий в окне агрегации;

  2. return("fire") — передача накопленных событий в функцию агрегации без очистки и закрытия окна;

  3. return("purge") — очистка и закрытие окна без передачи накопленных событий в функцию агрегации;

  4. return("fireAndPurge") — передача накопленных событий в функцию агрегации с очисткой и закрытием окна.

Выходы OUT и HEADERS в триггере игнорируются.

Если функция return() не была вызвана или вызвана с другим значением, то накопленные события будут переданы в функцию агрегации, а окно очищено и закрыто.

Настройка триггера срабатывания по времени#

Задается объектом в поле trigger шага агрегации с полями:

  1. type со значением "time";

  2. time — время, равное ожидаемому времени срабатывания, триггер срабатывает в момент достижения текущего времени этого значения;

  3. timePattern — формат времени;

  4. maxEmptyIntervals — количество пустых интервалов до сброса ключей агрегации.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
 type: "time"
 time: "11:05:00+03:00"
 timePattern: "HH:mm:ssXXX"
 maxEmptyIntervals: 3
}

Настройка триггера срабатывания по времени из события#

Задается объектом в поле trigger шага агрегации с полями:

  1. type со значением "timeFromEvent";

  2. source — источник правил трансформации, может принимать значения "classpath" и "file", по умолчанию "classpath";

  3. path — время, равное ожидаемому времени срабатывания, триггер срабатывает в момент достижения текущего времени этого значения;

  4. timePattern — формат времени;

  5. maxEmptyIntervals — количество пустых интервалов до сброса ключей агрегации.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
 type: "timeFromEvent"
 path: "testDsl/aggregation/time.tr"
 timePattern: "HH:mm:ss"
 maxEmptyIntervals: 3
}

Функция агрегации событий#

Функция агрегации принимает на вход массив событий, выполняет правила трансформации и на выход возвращает одно или несколько событий в зависимости от логики правил трансформации.

При выполнении DSL правил трансформации:

  1. Вход IN содержит события из окна агрегации;

  2. Environment.sources содержит массив переменных окружения для каждого события, индекс элементов соответствует индексам событий из IN. Каждый элемент массива содержит поля:

    • SourceTopic — содержит имя топика, откуда было прочитано сообщение;

    • SourceKey — содержит значение ключа из сообщения Apache Kafka в виде строки;

    • kafka.headers — содержит словарь заголовков из сообщения Apache Kafka;

  3. Environment.cache — содержит элементы, сохраняющиеся между вызовами функции агрегации;

  4. Environment.loadedVariables — содержит элементы, загруженные из JSON файла из параметра environmentVariables шага агрегации;

  5. Environment.AggregationKey — содержит значение ключа агрегации;

  6. Environment.AggregationWindowTimeMs — содержит продолжительность существования окна агрегации в миллисекундах;

  7. Environment.AggregationEventCount — содержит количество событий, переданных в функцию агрегации.

Выход OUT должен содержать исходящее событие или массив исходящих событий. Выход HEADERS может содержать следующие поля:

  1. TargetTopic — имя топика назначения события;

  2. TargetKey — значение ключа в сообщении Apache Kafka;

  3. kafka — словарь заголовков сообщения Apache Kafka, не поддерживает вложенные словари;

  4. BranchName — имя следующего шага из побочной ветки потока.

Если исходящих событий больше одного, то выход HEADERS должен содержать массив элементов с перечисленными выше полями. Порядок элементов должен соответствовать порядку событий в выходе OUT.

Поддерживается вызов функции return("decline"), который прерывает функцию агрегации и фильтрует полученные события.

Поддерживается вызов функции return("passthrough"), который передает входящие сообщение на следующий шаг без изменений (имеется возможность изменять заголовки и другую информацию при помощи DSL).

Вызов функции return() с любым другим параметром прервет дальнейшее выполнение правил трансформации, при этом уже заполненные выходы OUT и HEADERS будут обработаны и преобразованы в исходящие события.