Преобразование форматов событий на JavaScript#

Шаг предназначен для преобразования события из одной структуры JSON в другую структуру JSON.

Представляет собой объект с полем type со значением "js" и полями:

  1. name — имя шага;

  2. js — описание расположения правил трансформации;

  3. environmentVariables — загрузка переменных из файла;

  4. format - описание форматов входящего и исходящего событий:

    • input - формат входящего события;

    • output - формат исходящего события;

  5. destination — следующий шаг потока;

  6. error — шаг обработки в случае возникновения ошибки;

  7. database — настройки для базы, используемой в правилах трансформации;

  8. async — признак асинхронного выполнения, по умолчанию false;

  9. threadPoolSize — размер пула потоков для асинхронного выполнения трансформации;

  10. timeout — тайм-аут выполнения трансформации.

Значения полей js и environmentVariables представляет собой объект с полями:

  1. source — тип источника правил, принимает значения:

    • file — правила расположены в файловой системе;

    • classpath — правила расположены в classpath JVM;

  2. path — путь до файла на диске или в classpath JVM.

Значения полей input и output могут иметь значения:

  1. json — для событий в формате JSON;

  2. xml — для событий в формате XML;

  3. объект с полями:

    • type — со значением «avro»;

    • schema — список схем, значение аналогично полю dsl, объект имеющий логическое поле default со значением true, будет считаться схемой по умолчанию.

Пример блока трансформации при использовании JavaScript:#

transformStep: {
    name: "transformation-js"
    type: "js"
    format: {
        input: {
                type: "avro"
                schema: [{
                    source: "file"
                    path: "/file/path/to/avro/schema"
                },{
                    source: "classpath"
                    path: "/path/to/default/avro/schema"
                    default: true
                }]
        }
        output: "json"      
    }
    js: {
      source: "file"
      path: "simple.js"
    }
    environmentVariables: {      
      source: "file"
      path: "./environment.json"
    } 
    destination: {}
    error: {}
  }

Выполнение правил трансформации события на языке JavaScript#

Для преобразования событий необходимо заполнить файл трансформации .js. В файле описывается функция с именем transform с двумя аргументами:

  1. environment — окружение, сохраняющееся между выполнениями трансформации;

  2. inputs — массив входящих событий.

Результатом работы функции должен быть объект с полями:

  1. output — объект или массив исходящих событий;

  2. status — опциональное поле с возможными значениями:

    • accept — передать содержимое output дальше;

    • decline — игнорировать поля output, по сути фильтрация сообщения;

    • passthrought — передать дальше по потоку оригинальное сообщение, игнорируя содержимое поля output;

  3. environment - окружение, сохраняющееся между выполнениями трансформации.

Формат события (inputs и output)#

Каждое событие представляет собой объект с полями:

  1. key — значение ключа сообщения Apache Kafka;

  2. headers — объект с заголовками сообщения:

    • имя поля объекта — имя заголовка;

    • значение поля — значение заголовка в виде строки;

  3. body — тело сообщения, является результатом преобразования тела сообщения функцией JSON.parse();

  4. branch — имя ветки в потоке для маршрутизации сообщения;

  5. source — имя топика/очереди источника сообщения;

  6. destination — имя топика/очереди точки назначения сообщения.

  7. attributes - объект с атрибутами сообщения в формате CloudEvent

Пример трансформации#

function transform(environment, inputs) {
    let output = createEmptyOutputItem();
    let inputmsg = inputs[0]
    Logger.debug(JSON.stringify(inputs))
    if (!environment.hasOwnProperty("count")) {
        environment = {
            count: 0
        }
    }
    environment = {
        count: environment.count + 1
    }
    Logger.debug(JSON.stringify(inputmsg))
    if (inputmsg.body.message == "filter") {
        Logger.info("Decline message", new String(environment.count).toString(), "input-dev", "output-dev")
        let status = FlowStatus.decline;
        return {output,status};
    }
    output.attributes = {
        subject: inputmsg.attributes.subject,
        datacontenttype: "application/xml"
    }
    output.body = {
        message: inputmsg.body.message,
        number: environment.count
    }
    output.headers = inputmsg.headers;
    let status = FlowStatus.accept;
    return {output,status};
}

Доступные функции#

Объект FlowStatus со значениями для поля status результата трансформации#

Для заполнения значения поля status можно использовать поля объекта FlowStatus:

  1. FlowStatus.accept — константа accept;

  2. FlowStatus.decline — константа decline;

  3. FlowStatus.passthrought — константа passthrought.

Функция создания пустого исходящего события createEmptyOutputItem#

Функция createEmptyOutputItem() возвращает событие правильного формата с пустыми полями:

{key: "", headers: {}, body: {}, branch: "", source: "", destination: ""}

Функции логирования#

  • Logger.info(message, eventId = "", sourceTopic = "", targetTopic = "") — формирует лог с сообщением в JSON-формате на уровень логирования info;

  • Logger.error(message, eventId = "", sourceTopic = "", targetTopic = "") — формирует лог с сообщением в JSON-формате на уровень логирования error;

  • Logger.warn(message, eventId = "", sourceTopic = "", targetTopic = "") — формирует лог с сообщением в JSON-формате на уровень логирования warn;

  • Logger.debug(message, eventId = "", sourceTopic = "", targetTopic = "") — формирует лог с сообщением в JSON-формате на уровень логирования debug.

Пример сообщения:

{"timeStamp":"2020-09-17T16:35:51.040+03:00","eventId":"5643","sourceTopic":"SOURCE_TOPIC","targetTopic":"TARGETTOPIC","message":"log message"}

В случае, если какой-либо из параметров не определен, то значение этого параметра в логе будет отображаться как Undefined.

Например:

Logger.info(«log_message»)

{"timeStamp":"2020-09-17T16:35:51.040+03:00","eventId":"Undefined","sourceTopic":"Undefined","targetTopic":"Undefined","message":"log_message"}