Пример потока обработки#

Конфигурация потока обработки#

{
  // Базовый блок потока
  flow: {
    name: "EXAMPLE_NAME" // Наименование потока
    source: [${source_web}, ${source_app}] // Массив источников данных
  }
  
  // Первый источник данных (эмуляция данных с веб-портала)
  source_web: {
    name: "Source_web step in job EXAMPLE_NAME"
    type: "source"
    topic: "WEB.RESERVATION_EVENT.V1"
    config: ${defaults.kafka} {
        consumer: {
          "group.id": "EXAMPLE_NAME_source_web" // Отдельная consumer-group для данного топика
          "client.id": "event-process-flow-EXAMPLE_NAME_source_web"
        }
        producer: {
          "client.id": "event-process-flow-producer"
        }
    }
    destination: ${mergeStep}
  }

  // Второй источник данных (эмуляция данных из мобильного приложения)
  source_app: {
    name: "Source_app step in job EXAMPLE_NAME"
    type: "source"
    topic: "MOB.RESERVATION_EVENT.V1"
    config: ${defaults.kafka} {
        consumer: {
          "group.id": "EXAMPLE_NAME_source_app" // Отдельная consumer-group для данного топика
          "client.id": "event-process-flow-EXAMPLE_NAME_source_app"
        }
        producer: {
          "client.id": "event-process-flow-producer"
        }
    }
    destination: ${mergeStep}
  }

  // Объединение потоков от двух источников данных в общий поток обработки
  mergeStep: {
    name: "Merge source step in job EXAMPLE_NAME"
    type: "merge"
    destination: ${transformStep}
  }

  // Шаг трасформации - разбивка базового события на отдельные тематические события
  transformStep: {
    name: "Transformation step in job EXAMPLE_NAME"
    type: "dsl"
    format: {
      input: "json"
      output: "json"
    }
    dsl: {
      source: "file"
      path: "transformStep.tr"
    }
    destination: {
      type: "branch"
      destination: ${destinationFlight}
      branches: [${destinationBooking}, ${transformTransfer}]
    }
  }

  // Шаг трансформации для преобразования выходного сообщения по бронированию трансфера в формат XML
  transformTransfer: {
    name: "TransformTransfer step in job EXAMPLE_NAME"
    type: "dsl"
    format: {
      input: "json"
      output: "xml"
    }
    dsl: {
      source: "file"
      path: "transformTransfer.tr"
    }
    destination: ${destinationTransfer}
  }

  // Первый выходной поток данных - события бронирования билета на рейс
  destinationFlight: {
    name: "OutputFlight step in job EXAMPLE_NAME"
    type: "destination"
    topic: "CEP.TICKET_RESERVING_EVENT.V1"
    config: ${defaults.kafka}
  }

  // Второй выходной поток данных - события бронирования трансфера
  destinationTransfer: {
    name: "OutputTransfer step in job EXAMPLE_NAME"
    type: "destination"
    topic: "CEP.TRANSFER_RESERVING_EVENT.V1"
    config: ${defaults.kafka}
  }  

  // Третий выходной поток данных - события бронирования отеля
  destinationBooking: {
    name: "OutputBooking step in job EXAMPLE_NAME"
    type: "destination"
    topic: "CEP.HOTEL_RESERVING_EVENT.V1"
    config: ${defaults.kafka}
  }
}

Конфигурация преобразования#

Основной шаг трансформации#

define logInfo = "Job EXAMPLE_NAME got message with messageId = " || in.message.header.messageId || " in step transformStep"
info($logInfo, in.message.header.messageId)

define index = 0i

if (exists(in.message.body.flightDetails)) {
    define logInfo = "Job EXAMPLE_NAME step transformation part flightDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
    info($logInfo)

    headers[$index].kafka = in.message.header
    headers[$index].kafka.parentId = in.message.header.messageId
    headers[$index].kafka.messageId = generateId()
    out[$index].message.body.passenger = in.message.body.passenger
    out[$index].message.body.flightDetails = in.message.body.flightDetails

    headers[$index].BranchName = "OutputFlight step in job EXAMPLE_NAME"

    define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
    INFO($logInfo, in.message.header.messageId)
}

if (exists(in.message.body.transferDetails)) {
    if (exists(in.message.body.flightDetails)) {
        define index = 1i
    }
    define logInfo = "Job EXAMPLE_NAME step transformation part transferDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
    info($logInfo, in.message.header.messageId)

    headers[$index].kafka.dateTime = in.message.header.dateTime
    headers[$index].kafka.sourceType = in.message.header.sourceType
    headers[$index].kafka.parentId = in.message.header.messageId
    out[$index].message.body.passenger = in.message.body.passenger
    out[$index].message.body.transferDetails = in.message.body.transferDetails

    headers[$index].BranchName = "TransformTransfer step in job EXAMPLE_NAME"

    define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
    INFO($logInfo, in.message.header.messageId)
}

if (exists(in.message.body.bookingDetails)) {
    if (exists(in.message.body.flightDetails) and exists(in.message.body.transferDetails)) {
         define index = 2i
    } else if ((exists(in.message.body.flightDetails) and !exists(in.message.body.transferDetails)) or (!exists(in.message.body.flightDetails) and exists(in.message.body.transferDetails))) {
        define index = 1i
    }
    define logInfo = "Job EXAMPLE_NAME step transformation part bookingDetails, index = " || $index || " for message with messageId = " || in.message.header.messageId
    info($logInfo, in.message.header.messageId)

    headers[$index].kafka = in.message.header
    headers[$index].kafka.parentId = in.message.header.messageId
    headers[$index].kafka.messageId = generateId()
    out[$index].message.body.passenger = in.message.body.passenger
    out[$index].message.body.bookingDetails = in.message.body.bookingDetails

    headers[$index].BranchName = "OutputBooking step in job EXAMPLE_NAME"

    define logInfo = "Job EXAMPLE_NAME successfully sent message: " || out[$index] || " with $index = " || $index || " to Branch " || headers[$index].BranchName
    INFO($logInfo, in.message.header.messageId)
}

Дополнительный шаг трансформации для преобразования формата#

define logInfo = "Job EXAMPLE_NA got message with messageId = " || in.message.header.messageId || " and parentId = " || in.message.header.parentId || " in step transformTransfer and transformed it to XML"
info($logInfo, in.message.header.parentId)

headers.kafka.dateTime = Environment.kafka.headers.dateTime
headers.kafka.sourceType = Environment.kafka.headers.sourceType
headers.kafka.parentId = Environment.kafka.headers.parentId
headers.kafka.messageId = generateId()
out.message.body.passenger = in.message.body.passenger
out.message.body.transferDetails = in.message.body.transferDetails