Пример потока обработки#
Конфигурация потока обработки#
{
// Базовый блок потока
flow: {
name: "EXAMPLE_NAME" // Наименование потока
source: [${source_web}, ${source_app}] // Массив источников данных
}
// Первый источник данных (эмуляция данных с веб-портала)
source_web: {
name: "Source_web step in job EXAMPLE_NAME"
type: "source"
topic: "WEB.RESERVATION_EVENT.V1"
config: ${defaults.kafka} {
consumer: {
"group.id": "EXAMPLE_NAME_source_web" // Отдельная consumer-group для данного топика
"client.id": "event-process-flow-EXAMPLE_NAME_source_web"
}
producer: {
"client.id": "event-process-flow-producer"
}
}
destination: ${mergeStep}
}
// Второй источник данных (эмуляция данных из мобильного приложения)
source_app: {
name: "Source_app step in job EXAMPLE_NAME"
type: "source"
topic: "MOB.RESERVATION_EVENT.V1"
config: ${defaults.kafka} {
consumer: {
"group.id": "EXAMPLE_NAME_source_app" // Отдельная consumer-group для данного топика
"client.id": "event-process-flow-EXAMPLE_NAME_source_app"
}
producer: {
"client.id": "event-process-flow-producer"
}
}
destination: ${mergeStep}
}
// Объединение потоков от двух источников данных в общий поток обработки
mergeStep: {
name: "Merge source step in job EXAMPLE_NAME"
type: "merge"
destination: ${transformStep}
}
// Шаг трасформации - разбивка базового события на отдельные тематические события
transformStep: {
name: "Transformation step in job EXAMPLE_NAME"
type: "dsl"
format: {
input: "json"
output: "json"
}
dsl: {
source: "file"
path: "transformStep.tr"
}
destination: {
type: "branch"
destination: ${destinationFlight}
branches: [${destinationBooking}, ${transformTransfer}]
}
}
// Шаг трансформации для преобразования выходного сообщения по бронированию трансфера в формат XML
transformTransfer: {
name: "TransformTransfer step in job EXAMPLE_NAME"
type: "dsl"
format: {
input: "json"
output: "xml"
}
dsl: {
source: "file"
path: "transformTransfer.tr"
}
destination: ${destinationTransfer}
}
// Первый выходной поток данных - события бронирования билета на рейс
destinationFlight: {
name: "OutputFlight step in job EXAMPLE_NAME"
type: "destination"
topic: "CEP.TICKET_RESERVING_EVENT.V1"
config: ${defaults.kafka}
}
// Второй выходной поток данных - события бронирования трансфера
destinationTransfer: {
name: "OutputTransfer step in job EXAMPLE_NAME"
type: "destination"
topic: "CEP.TRANSFER_RESERVING_EVENT.V1"
config: ${defaults.kafka}
}
// Третий выходной поток данных - события бронирования отеля
destinationBooking: {
name: "OutputBooking step in job EXAMPLE_NAME"
type: "destination"
topic: "CEP.HOTEL_RESERVING_EVENT.V1"
config: ${defaults.kafka}
}
}
Конфигурация преобразования#
Основной шаг трансформации#
define logInfo = "Job EXAMPLE_NAME got message with messageId = " || in.message.header.messageId || " in step transformStep"
info($logInfo, in.message.header.messageId)
define index = 0i
if (exists(in.message.body.flightDetails)) {
define logInfo = "Job EXAMPLE_NAME step transformation part flightDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
info($logInfo)
headers[$index].kafka = in.message.header
headers[$index].kafka.parentId = in.message.header.messageId
headers[$index].kafka.messageId = generateId()
out[$index].message.body.passenger = in.message.body.passenger
out[$index].message.body.flightDetails = in.message.body.flightDetails
headers[$index].BranchName = "OutputFlight step in job EXAMPLE_NAME"
define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
INFO($logInfo, in.message.header.messageId)
}
if (exists(in.message.body.transferDetails)) {
if (exists(in.message.body.flightDetails)) {
define index = 1i
}
define logInfo = "Job EXAMPLE_NAME step transformation part transferDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
info($logInfo, in.message.header.messageId)
headers[$index].kafka.dateTime = in.message.header.dateTime
headers[$index].kafka.sourceType = in.message.header.sourceType
headers[$index].kafka.parentId = in.message.header.messageId
out[$index].message.body.passenger = in.message.body.passenger
out[$index].message.body.transferDetails = in.message.body.transferDetails
headers[$index].BranchName = "TransformTransfer step in job EXAMPLE_NAME"
define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
INFO($logInfo, in.message.header.messageId)
}
if (exists(in.message.body.bookingDetails)) {
if (exists(in.message.body.flightDetails) and exists(in.message.body.transferDetails)) {
define index = 2i
} else if ((exists(in.message.body.flightDetails) and !exists(in.message.body.transferDetails)) or (!exists(in.message.body.flightDetails) and exists(in.message.body.transferDetails))) {
define index = 1i
}
define logInfo = "Job EXAMPLE_NAME step transformation part bookingDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
info($logInfo, in.message.header.messageId)
headers[$index].kafka = in.message.header
headers[$index].kafka.parentId = in.message.header.messageId
headers[$index].kafka.messageId = generateId()
out[$index].message.body.passenger = in.message.body.passenger
out[$index].message.body.bookingDetails = in.message.body.bookingDetails
headers[$index].BranchName = "OutputBooking step in job EXAMPLE_NAME"
define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
INFO($logInfo, in.message.header.messageId)
}
Дополнительный шаг трансформации для преобразования формата#
define logInfo = "Job EXAMPLE_NA got message with messageId = " || in.message.header.messageId || " and parentId = " || in.message.header.parentId || " in step transformTransfer and transformed it to XML"
info($logInfo, in.message.header.parentId)
headers.kafka.dateTime = Environment.kafka.headers.dateTime
headers.kafka.sourceType = Environment.kafka.headers.sourceType
headers.kafka.parentId = Environment.kafka.headers.parentId
headers.kafka.messageId = generateId()
out.message.body.passenger = in.message.body.passenger
out.message.body.transferDetails = in.message.body.transferDetails