Реализация протокола для 4 поколения#

Реализация#

Реализация представляет собой вариант RPC over Kafka, асинхронное взаимодействие посредством RPC-вызовов, инкапсулируемых на транспортном уровне в сообщения Kafka.

Для взаимодействия технически применяется отдельный кластер Kafka («транспортная Kafka», «Kafka 4-3»), расположенная в том же сетевом сегменте, что и Архивирование (ARCH) и Platform V в целом.

На прикладном уровне протокол остается прежним. С клиентской стороны обращения к транспортной шине Kafka инкапсулируются клиентской библиотекой Архивирования (ARCH) (артефакт data-transport-api).

Работа напрямую с топиками Kafka на стороне Источников не предполагается, но технически возможна.

Поскольку протокол подразумевает асинхронное взаимодействие, для взаимодействия с одним Источником используются четыре топика: два топика на обработку запросов «Архивирование (ARCH) → Источник» и 2 топика на обработку обратного запроса (ответа) «Источник → Архивирование (ARCH). Каждая пара топиков предполагает двунаправленное, кратковременное, псевдосинхронное взаимодействие: в первый топик помещается сообщение-запрос, а во второй – квитовка (ответ) на него, результат выполнения RPC-вызова.

Описание топиков:

  1. Топик запросов request – в него со стороны Архивирования (ARCH) помещается пакет, содержащий контейнер с запросом к Источнику. Список запросов:

    • initLoad

    • getBatchCount

    • loadBatchAsync

    • abort

    • getQualitySample

    • getQualityBatchSize

    • getQualityBatch

  2. Топик синхронных ответов response – в него помещаются ответы Источника, содержащие данные на запросы:

    • initLoad

    • getBatchCount

    • getQualityBatchSize

    На все остальные пакеты ответ–подтверждение (ACK) технически представляет собой пустой контейнер ответа, сигнализирующий о том, что запрос был получен.

  3. Топик асинхронных ответов async – в него помещаются асинхронные ответы Источника на запросы:

    • loadBatchAsync (вызов loadInitBatch);

    • getQualitySample (вызов loadQualitySample);

    • getQualityBatch (вызов LoadQualityBatch).

  4. Топик подтверждений асинхронных ответов confirm (обратно направленный в сторону Источника топик ответов) – в него помещаются ответы–подтверждения (ACK) Архивирования (ARCH). Технически это пустой контейнер ответа, сигнализирующий Источнику о том, что запрос был получен.

Топики технически размещаются в Kafka Прикладного журнала (APLJ). Топики заводятся группами по одной группе на шард Источника – так у каждой зоны Источника в случае шардирования будет своя группа топиков. В случае отсутствия шардирования – зона default.

Топики заводятся в следующей нотации: <Полигон>-<Мнемоника Источника>- <Зона топологии>-<Тип топика>. Тип топика согласно вышеперечисленным: request/response/async/confirm.

Маршрутизация сообщений по топологии простая. Сообщения направляются в топики, соответствующие полигону Архивирования (ARCH), мнемонике Источника и заявленным зонам согласно значениям параметров топологии из конфигурации Источника. Для отправки ответов и асинхронных запросов со стороны Источника для маршрутизации сообщений используются настройки на стороне приложений Источников, хранящиеся в конфигурации OpenShift в контексте Источника.

Из-за multi-instance архитектуры Архивирования (ARCH) и Источника может быть более одного экземпляра как модуля Источника, так и модуля Архивирования (ARCH), отправляющего запросы. Это решается тем, что на стороне Источника не должно быть такого состояния, которое определяет, что обработать некоторый запрос и сформировать ответ на него может только определенный узел.

На стороне Архивирования (ARCH) взаимодействие с Источником требует хранения состояния. Поэтому нужно обеспечить вычитку и получение именно того узла, который отправил запрос. Для этого:

  1. Каждый узел Архивирования (ARCH) (и каждый узел Источника при работе с асинхронным потоком) читает топик уникальным потребителем (для каждого узла – своя группа потребителей).

  2. В контейнер запроса и ответа добавляется информация об узле, который отправил запрос (для запросов), и эта ⁣же ⁣информация передается обратно (дублируется) обрабатывающей запрос стороной в ответ (для ответов). На стороне узлов, обрабатывающих ответы, вычитываются таким образом все ответы, но отфильтровываются только те, которые этому узлу предназначены. Это увеличивает нагрузку при чтении топиков, но упрощает маршрутизацию.

Схема работы с топиками:

Соглашения о работе с топиками#

Топики заводятся в следующей нотации: <Полигон>-<Мнемоника Источника>- <Зона топологии>-<Тип топика>.

  • Полигон – соответственно ИФТ-ПСИ-ПРОМ РБ-КБ-PCI DSS.

  • Мнемоника Источника – согласно той, под которой Источник заводится в подсистеме конфигурирования Архивирования (ARCH). Определяется Источником.

  • Зона топологии – согласно топологии Источника, определяется из системы параметризации Архивирования (ARCH), передается Источником в составе дистрибутива.

  • Тип топика – один из следующих вариантов:

    • request – запросы от Архивирования (ARCH) к Источнику;

    • response – ответы Источника на запросы в сторону Архивирования (ARCH);

    • async – асинхронные запросы на принимающее API Архивирования (ARCH) в сторону Архивирования (ARCH);

    • confirm – ответы Архивирования (ARCH) на асинхронные запросы (подтверждение приема).

Пример:

Источник с мнемоникой MNENO_NAME на полигоне ПСИ РБ в зоне test_zone будет иметь топики:

psi-rb-MNENO_NAME-test_zone-request psi-rb-MNENO_NAME-test_zone-response psi-rb-MNENO_NAME-test_zone-async psi-rb-MNENO_NAME-test_zone-confirm

Доступ к топикам разграничивается на основании ACL пользовательских сертификатов. Организуется следующими правилами:

Топик

Источник

Архивирование (ARCH)

request

read, describe

write, describe

response

write, describe

read, describe

async

write, describe

read, describe

confirm

read, describe

write, describe

Транспортные сообщения и формат контейнеров#

Транспортный формат сообщений (JSON): запросы и ответы на транспортном уровне (Архивирование (ARCH) 4 поколения – взаимодействие выполняется через интеграционную Kafka).

Запрос:

{
  "type": "object",
  "properties": {
    "request_id": {
      "type": "string"
    },
    "sender_node": {
      "type": "string"
    },
    "sender_timestamp": {
      "type": "string"
    },
    "request": {
      "type": "object",
      "properties": {
        "method": {
          "type": "string"
        },
        "parameters": {
          "type": "array",
          "items": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "type": {
                "type": "string"
              },
              "value": {
                "type": "any"
              }
            }
          }
        }
      }
    }
  }
}

Если в формате запроса (properties) значение type представлено не простым типом, то valueJsonObject;

Ответ:

{
  "type" : "object",
   "properties" : {
    "request_id" : {
      "type" : "string"
    },
    "correlation_id" : {
      "type" : "string"
    },
    "target_node" : {
      "type" : "string"
    },
    "response_timestamp" : {
      "type" : "string"
    },
    "exception" : {
      "type" : "string"
    },
    "result" : {
      "type" : "object",
       "properties" : {
        "type" : {
          "type" : "string"
        },
        "value" : {
          "type" : "any"
        }
      }
    }
  }
}

Примеры сообщений:

Запрос:

{
  "request_id": "1234-5678-4635-6453",
  "sender_node": "34",
  "sender_timestamp": "2021-03-17 22:01:29.611",
  "request": {
    "method": "initLoad",
    "parameters": [
      {
        "name": "arg1",
        "type": "String",
        "value": "... payload"
      },
      {
        "name": "arg2",
        "type": "Integer",
        "value": 1
      },
      {
        "name": "arg3",
        "type": "com.sbt.pprbod.data.transport.entity.LoadResult",
        "value": {
          "foo": "bar",
          "field1": "1234"
        }
      }
    ]
  }
}

Ответ:

{
  "request_id": "9876-5432-10000-641000",
  "correlation_id": "1234-5678-4635-6453",
  "target_node": "34",
  "response_timestamp": "2021-03-17 22:01:29.611",
  "exception": "",
  "result": {
    "type": "String",
    "value": "... payload"
  }
}