Преобразование форматов событий на JavaScript#
Шаг предназначен для преобразования события из одной структуры JSON в другую структуру JSON.
Представляет собой объект с полем type со значением "js" и полями:
name— имя шага;js— описание расположения правил трансформации;environmentVariables— загрузка переменных из файла;format- описание форматов входящего и исходящего событий:input- формат входящего события;output- формат исходящего события;
destination— следующий шаг потока;error— шаг обработки в случае возникновения ошибки;database— настройки для базы, используемой в правилах трансформации;async— признак асинхронного выполнения, по умолчаниюfalse;threadPoolSize— размер пула потоков для асинхронного выполнения трансформации;timeout— тайм-аут выполнения трансформации.
Значения полей js и environmentVariables представляет собой объект с полями:
source— тип источника правил, принимает значения:file— правила расположены в файловой системе;classpath— правила расположены в classpath JVM;
path— путь до файла на диске или в classpath JVM.
Значения полей input и output могут иметь значения:
json— для событий в формате JSON;xml— для событий в формате XML;объект с полями:
type— со значением «avro»;schema— список схем, значение аналогично полюdsl, объект имеющий логическое полеdefaultсо значениемtrue, будет считаться схемой по умолчанию.
Пример блока трансформации при использовании JavaScript:#
transformStep: {
name: "transformation-js"
type: "js"
format: {
input: {
type: "avro"
schema: [{
source: "file"
path: "/file/path/to/avro/schema"
},{
source: "classpath"
path: "/path/to/default/avro/schema"
default: true
}]
}
output: "json"
}
js: {
source: "file"
path: "simple.js"
}
environmentVariables: {
source: "file"
path: "./environment.json"
}
destination: {}
error: {}
}
Выполнение правил трансформации события на языке JavaScript#
Для преобразования событий необходимо заполнить файл трансформации .js. В файле описывается функция с именем transform с двумя аргументами:
environment— окружение, сохраняющееся между выполнениями трансформации;inputs— массив входящих событий.
Результатом работы функции должен быть объект с полями:
output— объект или массив исходящих событий;status— опциональное поле с возможными значениями:accept— передать содержимоеoutputдальше;decline— игнорировать поляoutput, по сути фильтрация сообщения;passthrought— передать дальше по потоку оригинальное сообщение, игнорируя содержимое поляoutput;
environment- окружение, сохраняющееся между выполнениями трансформации.
Формат события (inputs и output)#
Каждое событие представляет собой объект с полями:
key— значение ключа сообщения Apache Kafka;headers— объект с заголовками сообщения:имя поля объекта — имя заголовка;
значение поля — значение заголовка в виде строки;
body— тело сообщения, является результатом преобразования тела сообщения функциейJSON.parse();branch— имя ветки в потоке для маршрутизации сообщения;source— имя топика/очереди источника сообщения;destination— имя топика/очереди точки назначения сообщения.attributes- объект с атрибутами сообщения в формате CloudEvent
Пример трансформации#
function transform(environment, inputs) {
let output = createEmptyOutputItem();
let inputmsg = inputs[0]
Logger.debug(JSON.stringify(inputs))
if (!environment.hasOwnProperty("count")) {
environment = {
count: 0
}
}
environment = {
count: environment.count + 1
}
Logger.debug(JSON.stringify(inputmsg))
if (inputmsg.body.message == "filter") {
Logger.info("Decline message", new String(environment.count).toString(), "input-dev", "output-dev")
let status = FlowStatus.decline;
return {output,status};
}
output.attributes = {
subject: inputmsg.attributes.subject,
datacontenttype: "application/xml"
}
output.body = {
message: inputmsg.body.message,
number: environment.count
}
output.headers = inputmsg.headers;
let status = FlowStatus.accept;
return {output,status};
}
Доступные функции#
Объект FlowStatus со значениями для поля status результата трансформации#
Для заполнения значения поля status можно использовать поля объекта FlowStatus:
FlowStatus.accept— константаaccept;FlowStatus.decline— константаdecline;FlowStatus.passthrought— константаpassthrought.
Функция создания пустого исходящего события createEmptyOutputItem#
Функция createEmptyOutputItem() возвращает событие правильного формата с пустыми полями:
{key: "", headers: {}, body: {}, branch: "", source: "", destination: ""}
Функции логирования#
Logger.info(message, eventId = "", sourceTopic = "", targetTopic = "")— формирует лог с сообщением в JSON-формате на уровень логирования info;Logger.error(message, eventId = "", sourceTopic = "", targetTopic = "")— формирует лог с сообщением в JSON-формате на уровень логирования error;Logger.warn(message, eventId = "", sourceTopic = "", targetTopic = "")— формирует лог с сообщением в JSON-формате на уровень логирования warn;Logger.debug(message, eventId = "", sourceTopic = "", targetTopic = "")— формирует лог с сообщением в JSON-формате на уровень логирования debug.
Пример сообщения:
{"timeStamp":"2020-09-17T16:35:51.040+03:00","eventId":"5643","sourceTopic":"SOURCE_TOPIC","targetTopic":"TARGETTOPIC","message":"log message"}
В случае, если какой-либо из параметров не определен, то значение этого параметра в логе будет отображаться как Undefined.
Например:
Logger.info(«log_message»)
{"timeStamp":"2020-09-17T16:35:51.040+03:00","eventId":"Undefined","sourceTopic":"Undefined","targetTopic":"Undefined","message":"log_message"}