Агрегация событий#

Шаг предназначен для агрегации нескольких событий в одно с помощью модуля declarative-mapper.

Представляет собой объект с полем type со значением "aggregation" и полями:

  1. name — имя шага;

  2. environmentVariables — загрузка переменных из файла;

  3. format — описание форматов входящих и исходящих событий:

  4. key — описание вычисление ключа события;

  5. trigger — описание триггера агрегации событий:

    • объект с полем type со значениями:

      • count — триггер подсчета заданного количества событий, в отдельном поле count задается число агрегируемых событий;

      • dsl — триггер с логикой работы на DSL, содержит поля аналогичные полю dsl;

      • time — триггер срабатывания по времени, срабатывает раз в сутки в заданное время;

      • timeFromEvent — триггер срабатывания по времени из события, срабатывает раз в сутки согласно заданному времени, полученному из события при помощи DSL;

    • также содержит поле timeout со значением тайм-аута в миллисекундах после получения последнего события;

  6. dsl — описание расположения правил агрегации;

  7. stateFull — сохранение Environment.cache в состоянии Flink, логическое, по умолчанию false;

  8. destination — следующий шаг потока;

  9. error — шаг обработки в случае возникновения ошибки;

  10. shouldLogDebug - флаг для вывода информации о результатах выполнения шагов dsl (при триггере с логикой работы на dsl);

  11. storage - настройки хранилища агрегации, отличающиеся от заданных в reactiveConfig (реализован только для компонента EVPC).

Значения полей dsl и environmentVariables представляет собой объект с полями:

  1. source — тип источника правил, принимает значения:

    • file — правила расположены в файловой системе;

    • classpath — правила расположены в classpath JVM.

  2. path — путь до файла на диске или в classpath JVM.

Значения полей input и output могут иметь значения:

  1. json — для событий в формате JSON;

  2. xml — для событий в формате XML;

  3. событие в формате csv:

    • type — со значением csv;

    • columnSeparator — разделитель полей события;

    • fieldNames — список имен полей события для использования при трансформации, также определяет порядок полей.

  4. объект с полями:

    • type — со значением avro;

    • schema — список схем, значение аналогично полю dsl, объект имеющий логическое поле default со значением true, будет считаться схемой по умолчанию.

Пример агрегации событий#

{
  source: {
    name: "testInput"
    type: "test"
    topic: "input"
    config: ${defaults.kafka}
    destination: ${aggregationStep}
  }

  deadletter: {
    name: "deadletter"
    type: "destination"
    topic: "deadletter"
    config: ${defaults.kafka}
  }

  destination: {
    name: "output"
    type: "destination"
    topic: "output"
    config: ${defaults.kafka}
  }

  aggregationStep: {
    name: "dsl-aggregation"
    type: "aggregation"
    format: {
      input: "json"
      output: "json"
    }
    key {
      type: "dsl"
      path: "key.tr"
    }
    trigger: {
      type: "dsl"
      path: "trigger.tr"
      timeout: 100
    }
    dsl: {
      path: "simple.tr"
    }
    destination: ${destination}
    error: ${deadletter}
  }

  flow: {
    name: "Event Process test"
    source: [${source}]
  }
}

Avro-схема выбирается исходя из значения заголовка "schemaName" в сообщении Apache Kafka. Если заголовок не задан, то используется схема по умолчанию или первая в списке в случае, если никакая схема не помечена полем default со значением true.

Логика потока агрегации событий#

Поток агрегации состоит из 3 частей:

  1. Вычисление ключа события и сортировка потока событий по ключу;

  2. Временное окно и триггер отправки событий в функцию агрегации;

  3. Функция агрегации событий.

Вычисление ключа событий#

Логика вычисления ключа для события задается параметром key, объект с полем type которое принимает следующие значения:

  1. key — вычисление ключа по значению ключа из сообщения Apache Kafka, представляет собой объект с полем type равным key;

  2. headers — вычисление ключа по определенным значениям заголовков, представляет собой объект с полем type равным headers и в поле keys содержит массив заголовков, из которых должен формироваться ключ. Значение ключа является объединением значений заголовков с разделителем |.

  3. hash — вычисление ключа хеш функцией из тела сообщения. Представляет собой объект с полем type равным hash, опционально в поле algorithm содержит имя хеш алгоритма, по умолчанию SHA-256.

  4. dsl — вычисление ключа с помощью правил трансформации, объект содержит поля:

    • type со значением "dsl";

    • source — источник правил трансформации classpath или file;

    • path — путь до правил трансформации.

При вычислении ключа через DSL правил трансформации:

  1. Вход IN содержит событие;

  2. Environment.SourceTopic содержит имя топика, откуда было прочитано сообщение;

  3. Environment.SourceKey содержит значение ключа из сообщения Apache Kafka в виде строки;

  4. Environment.kafka.headers содержит словарь заголовков из сообщения Apache Kafka.

При вычислении ключа с типом dsl правила трансформации должны завершаться вызовом функции return(<key>), где <key> — значение ключа. Выходы OUT и HEADERS скрипта трансформации игнорируются.

Окно агрегации событий#

После вычисления ключа событие попадает в окно агрегации. В универсальном обработчике используется глобальное окно агрегации.

Session Window

В нашем случае параметр timeout у триггера задает session gap, то есть максимальное время между получением событий с одним ключом. В момент возникновения тайм-аута все собранные события в окне передаются в функцию агрегации, а окно агрегации очищается.

Для каждого нового события создается свое окно агрегации, затем ищется существующее окно для событий с таким же ключом. Если окно было создано ранее, то старое окно и новое объединяются. При этом начало нового окна становится началом первого окна, а конец берется из второго окна.

Окна до объединения#

Before merge

Окна после объединения#

After merge

В процессе объединения окон происходит объединение состояний, после чего вызывается триггер.

Триггер отправки событий в функцию агрегации#

Предусмотрено 4 вида триггера:

  1. Триггер подсчета количества событий в окне агрегации, срабатывает при накоплении заданного числа событий или при наступлении тайм-аута относительно получения первого события.

  2. Триггер с логикой на DSL правил трансформации событий, срабатывает согласно заданным в DSL правилам или при наступлении тайм-аута относительно получения первого события.

  3. Триггер срабатывания по времени, срабатывает раз в сутки в заданное время.

  4. Триггер срабатывания по времени из события, срабатывает раз в сутки согласно заданному времени, полученному из события при помощи DSL.

Настройка триггера подсчета количества событий#

Задается объектом в поле trigger шага агрегации с полями:

  1. type — со значением "count";

  2. count — целое число, равное количеству ожидаемых событий. Триггер срабатывает в момент достижения количества событий этого значения;

  3. timeout — время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по тайм-ауту;

  4. shouldFireAtProcessingTime — должен ли срабатывать триггер по тайм-ауту относительно получения первого события, значение по умолчанию true.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
   type: "count"
   count: 2
   timeout: 100
}

Настройка триггера с логикой на DSL#

Задается объектом в поле trigger шага агрегации с полями:

  1. type — со значением "dsl";

  2. source — источник правил трансформации, может принимать значения "classpath" и "file", по умолчанию "classpath";

  3. path — путь до файла с правилами;

  4. timeout — время в миллисекундах после получения последнего события в окне агрегации и до срабатывания триггера по тайм-ауту;

  5. shouldFireAtProcessingTime — должен ли срабатывать триггер по тайм-ауту относительно получения первого события, значение по умолчанию true.

trigger: {
    type: "dsl"
    source: "file"
    path: "script.tr"
    timeout: 100
}

После срабатывания триггера по тайм-ауту происходит передача накопленных событий в функцию агрегации и очистка окна.

При добавлении нового события в окно агрегации происходит выполнение DSL правил трансформации:

  1. Вход IN содержит новое событие;

  2. Environment.SourceTopic содержит имя топиков, откуда было прочитано сообщение;

  3. Environment.SourceKey содержит значение ключа из сообщения Apache Kafka в виде строки;

  4. Environment.cache содержит элементы, сохраняющиеся между вызовами триггера;

  5. Environment.TriggerFireMs содержит время вызова триггера в миллисекундах в формате Unix;

  6. Environment.kafka.headers содержит словарь заголовков из сообщения Apache Kafka.

DSL скрипт правил трансформации должен вернуть функцией return() один из следующих статусов:

  1. return("continue") — продолжение накопления событий в окне агрегации;

  2. return("fire") — передача накопленных событий в функцию агрегации без очистки и закрытия окна;

  3. return("purge") — очистка и закрытие окна без передачи накопленных событий в функцию агрегации;

  4. return("fireAndPurge") — передача накопленных событий в функцию агрегации с очисткой и закрытием окна.

Выходы OUT и HEADERS в триггере игнорируются.

Если функция return() не была вызвана или вызвана с другим значением, то накопленные события будут переданы в функцию агрегации, а окно очищено и закрыто.

Настройка триггера срабатывания по времени#

Задается объектом в поле trigger шага агрегации с полями:

  1. type со значением "time";

  2. time — время, равное ожидаемому времени срабатывания, триггер срабатывает в момент достижения текущего времени этого значения;

  3. timePattern — формат времени;

  4. maxEmptyIntervals — количество пустых интервалов до сброса ключей агрегации.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
 type: "time"
 time: "11:05:00+03:00"
 timePattern: "HH:mm:ssXXX"
 maxEmptyIntervals: 3
}

Настройка триггера срабатывания по времени из события#

Задается объектом в поле trigger шага агрегации с полями:

  1. type со значением "timeFromEvent";

  2. source — источник правил трансформации, может принимать значения "classpath" и "file", по умолчанию "classpath";

  3. path — путь до файла правил трансформации с логикой срабатывания триггера по времени из события на DSL;

  4. timePattern — формат времени;

  5. maxEmptyIntervals — количество пустых интервалов до сброса ключей агрегации.

После срабатывания триггера происходит передача накопленных событий в функцию агрегации и очистка окна.

trigger: {
  type: "timeFromEvent"
  path: "time.tr"
  timePattern: "HH:mm:ss"
  maxEmptyIntervals: 3
}