Подключение и конфигурирование#

Требования для интеграции источника в рамках целевой DEVOPS схемы#

  1. Источник должен использовать один из перечисленных ниже вариантов представления собственной модели данных:

    • Java DTO классы.

    • Работа с данными с использованием ORM библиотеки Platform V Persistence в 3 или в 4 поколении Платформы.

    • Работа с данными посредством использования платформенного решения 4 поколения Платформы — Platform V DataSpace.

    • Работа через платформенный БПС Сделки в 3 поколении Платформы.

  2. Источник должен иметь интеграцию с SW META, а именно:

    • Иметь опубликованную ЛМ АС в SW META (Инструкция для Источника по публикации ЛМ АС).

    • Вести разметку белых списков в разрезе типов данных и аттрибутов в SW META.

  3. Источник должен иметь настроенный сборочный конвейер с возможностью добавлять в него внешние фазы в виде сторонних задач.

  4. Источник должен иметь настроенный релизный DPM конвейер с возможностью добавлять в него внешние этапы в виде требуемых задач.

Интеграция с SW META является обязательной для источников, начиная с 3 волны и далее. Источники пилота и первых двух волн загрузки могут использовать передачу файлов модели и белого списка непосредственно в конвейер, при условии фиксации технического долга по интеграции с АС «МЕТА».

Общее описание DEVOPS-этапов для интеграции источника (Сборка и ИФТ)#

Этапы разделяются на фазы, выполняемые как на стадии сборки дистрибутива в сборочном конвейере, так и на стадии развертывания дистрибутива источника через релизный DPM конвейер.

Стадия сборки дистрибутива включает в себя следующие этапы:

  • Для источников, использующих Java DTO классы — подключение в проект плагина Archiving для генерации дескриптора модели 3. Подключение Maven плагина в проект Источника. Для таких источников предусмотрена возможность: 2. Разметка физической модели источника аннотациями — предназначена для унификации работы Archiving и устранения необходимости кодирования данной разметки в коде Archiving. В целевом варианте работы на универсальном формате вектора изменений такая разметка не требуется.

  • Для источников, использующих иное представление модели данных, кроме DTO — подключение плагина не требуется.

  • Подключение фазы B-Pipeline 4. B-Pipeline. Сборка конфигурации (конфигурационного архива) для Archiving для сбора от источника и из SW META и включения в свой собственный дистрибутив параметров и артефактов для Archiving — согласно требованиям к составу дистрибутива.

Стадии сборочной фазы B-Pipeline и процесс работы отображены на схеме:

Стадия развертывания дистрибутива включает в себя этапы, обеспечивающие проверку дистрибутива, конфигурирование Archiving для источника и прогон смок-регресс тестов источника. По результатам каждой фазы проставляются соответствующие Quality Gate.

Подробно настройка подзадачи и фазы DPM конвейера рассматриваются в разделах:

  • R-Pipeline. ИФТ. Валидация дистрибутива источника;

  • R-Pipeline. ИФТ. Фаза конфигурирования экземпляра Archiving;

  • R-Pipeline. ИФТ. Smoke-Regress тесты;

  • R-Pipeline. ИФТ. Настройка DPM.

Стадии релизной фазы R-Pipeline и процесс работы показаны на схеме:

Примечание

Прохождение этапов синхронизируется по QG-флагу, который соответствует стенду. На схеме ниже используется QG pprbod.ift , так как процесс начинается с ИФТ-стенда. На следующих этапах QG будет pprbod.uat (для ПСИ).

После проверки флагов pprbod.ift = ready и pprbod.ift = valid, которые, в свою очередь, устанавливаются источниками после успешной установки дистрибутива на Archiving и на источнике, и после проверки в DPM флагов Smoke-Regress тестов Archiving (блок автотестов реализуется Archiving по его внутренним Методикам) и Интеграционных автотестов источника (реализуются источником согласно Методикам тестирования).

Описание DEVOPS-этапов для фазы ПСИ#

На фазе ПСИ выполняется только задача конфигурирования Archiving на полигоне ПСИ и выставляются соответствующие флаги DPM.

Со стороны Archiving — это фаза конфигурирования источника (tsa_configure). Конфигурирование выполняется средствами той же библиотеки tsajlib, описанной в разделе «R-Pipeline. ИФТ. Фаза конфигурирования инстанса Archiving». Отличие состоит в том, что в качестве полигона передается полигон ПСИ.

По итогу успешного выполнения фазы проставляется флаг pprbod.uat = ready, свидетельствующий о готовности Archiving на полигоне ПСИ к работе. Со стороны КАП выполняется подготовка и публикация плагина, а также выставляется флаг pprbod.uat = valid, свидетельствующий о готовности источника к проведению ПСИ на полигоне.

Обязательным предусловием конфигурирования Archiving на ПРОМ является наличие флага pprbod.uat, установленного в значение ok и pprbod.uat, установленного в значение valid.

Со стороны Archiving — это фаза выполнения конфигурирования источника (tsa_configure). Конфигурирование выполняется средствами той же библиотеки tsajlib, но на ПРОМ полигоне.

Библиотеки TSAJLIB (исходные коды) поставляются на полигоны ПСИ и ПРОМ в составе дистрибутивов Archiving, являясь их неотъемлемой частью. Они строго синхронизированы по версиям. Кодовая база tsajlib строго должна соответствовать кодовой базе Archiving на полигоне.

Состав дистрибутива конфигурации источника#

Дистрибутив источника Archiving поставляется в виде zip-архива.

Дистрибутив должен быть собран в соответствии с форматом поставки Install EIP. Для Archiving дистрибутив источника включает артефакт с конфигурацией источника. Этот файл содержит артефакты конфигурации, необходимые для корректной работы Archiving с версией модели данных источника, соответствующей текущему выпускаемому дистрибутиву.

Структурно для install_EIP артефакты конфигурирования размещаются в папке ./other/model. В нее помещаются:

  1. Aрхив, содержащий файлы конфигурации — это артефакт с конфигурацией источника, передаваемой в Archiving.

  2. AVRO — схемы модели источника, сгенерированные на фазе сборки дистрибутива по актуальной версии модели данных, белому и черному спискам.

    Схем модели источника две: полная (model_schemas_full.txt) и усеченная по черному и белому списку (model_schemas.txt), которая и является рабочей для КАП. Эти схемы генерируются на фазе «B-Pipeline. Сборка конфигурации для Archiving».

  3. Файл sourceDescription.yml, содержащий параметры источника. Создается и предоставляется самим источником в процессе сборки дистрибутива. Все параметры должны быть строкового типа, даже те, в которые входит число, например:

    Не верно:

    dataSampleThreshold: 1000

    Верно:

    dataSampleThreshold: "1000"

    Параметрами источника являются:

    • name - мнемонический идентификатор источника в верхнем регистре (например,UVSK_B2C_APPEALS_CLASS). Предпочтительно, чтобы мнемоника отражала в себе название Прикладной фабрики, например, для фабрики «Главная бухгалтерская книга» выбрана мнемоника «GBK», согласно аббревиатуре названия фабрики.

      Важно

      Мнемоника должна быть уникальна внутри конфигурации ТС Архивирование.

      Обязательным требованием является согласование мнемоники при первичной интеграции с Archiving. В процессе (после первичной интеграции) эта мнемоника более не меняется для источника. Мнемоника источника не может отличаться для различных контуров и полигонов, на которых представлен модуль источника.

      Примечание

      Мнемоника не может содержать дефис (минус). При необходимости используйте знак нижнего подчеркивания (NAME_DATA_SOURCE). Максимальная длина — 25 символов.

      Пример:

      Не верно: FAB-12 Fab_12

      Верно:

      FAB_12

    • description — человекочитаемое описание источника согласно ARIS (например, «Гамма СВБ – Главная бухгалтерская книга (ГБК)»).

    • modelType — тип модели данных, применяемых источником (один из вариантов):

      Тип источника

      Передаваемое значение/modelType

      Источники, генерирующие дескриптор модели на основе кастомных DTO-классов

      CLASSES

      Источники на БПС.Cделки

      EIP_METAMODEL

      Источники на Маппере, транспортном формате 4.0 и DataSpace

      MAPPER_METAMODEL

      Зарезервировано для будущих версий

      SCHEMAS

    • whiteListMode — режим работы черного и белого списков (один из вариантов: ROOT_TYPES, ENABLED, DISABLED),

      где

      Параметр

      Описание

      ENABLED

      Самый жесткий уровень фильтрации. В белом списке должны присутствовать не только типы объектов, но и перечень их атрибутов

      ROOT_TYPES

      Cредний уровень фильтрации. В белом списке должны присутствовать только типы объектов, все их атрибуты будут переданы, даже если атрибуты в списке не указаны

      DISABLED

      Передаются все объекты, которые приходят в адаптер

    • verboseKey — по умолчанию false (выключен). Включается в исключительных случаях только после согласования с КАП.

      Если включен (true) - то в каждой записи, реплицируемой в КАП, к идентификатору будет добавляться префикс именни класса.

      Примеры:

      • если VerboseKey = false, ID записи будет выглядеть так: 11223344;

      • если VerboseKey = true, ID будет выглядеть так: com.sbt.className.11223344.

    • initSourceModuleId (запрещено к использованию) — идентификатор модуля Платформы для инита (например, gbk-main). Если источник использует БПС «Сделки», то необходимо указывать модуль через который происходит взаимодействие с Archiving (mega-bas).

      Подробнее об этом параметре читайте в статье «Инициализирующая выгрузка (Инит) для источников, использующих Маппер, БПС «Сделки» и DataSpace». ( п. «Настройка Фонового Процесса в BGP», параметр module)

      Важно!

      Параметр выводится из эксплуатации. Если у вас настроен Devops с заполнением параметра, можно все оставить без изменений. Работоспособность не изменится. Новым источникам заполнять параметр не нужно.

    • dqModuleId — идентификатор модуля Платформы, через который осуществляется ТКД и офлайн ТКД, например, gbk-main.

      Если источник использует БПС «Сделки», то необходимо указывать модуль, через который происходит взаимодействие с ТС Архивирование (mega-bas).

      Информация

      Для источников, использующих для интеграции протокол 4-3 («Подключение транспортной библиотеки Kafka (4-3) на стороне потребителя») в качестве идентификатора модуля используется мнемоника источника, задаваемая в application.yml в параметре pprbod.cloud.client.source. т.е. в качестве dqModuleId нужно передавать ее.

    • ajZones — список зон ПЖ (это множество параметров, представленное конкретными параметрами для полигонов в виде <полигон>):

      • ift;

      • psi;

      • prom.

      Если параметры по полигону отсутствуют, то при установке на этом полигоне применяется значение базового ajZones, Archiving при работе будет вычитывать журнал в этой зоне.

      Если зона еще не существует, то нужно зарегистрировать ее в нужных контурах по шаблонам Прикладного журнала.

    • ajDataTypes — типы данных ПЖ (перечисляются через точку с запятой). Представляет собой аналогичное множество:

      • ift;

      • psi;

      • prom.

      Если параметры по полигону отсутствуют, то при установке на этом полигоне применяется значение базового ajDataTypes. Archiving при работе будет вычитывать указанные типы данных в указанных зонах.

    • mmtZones - список зон ММТ (перечисляются через точку с запятой), представляет собой аналогичное множество:

      • ift;

      • psi;

      • prom.

      Информация

      Для источников, использующих для интеграции протокол 4-3 («Подключение транспортной библиотеки Kafka (4-3) на стороне потребителя») в качестве идентификатора ММТ зоны нужно передавать то, что указано в параметре pprbod.cloud.client.zone файла application.yml.

      Если у источника в топологии существует одна зона шардирования, в данном параметре и в параметре pprbod.cloud.client.zone указывается default. На данный момент ТС Архивирование не поддерживает более одной зоны шардирования у источника.

    • nexusArtifactId — artifactId фабрики в Nexus. Нужен для простановки флагов QGM.

      Пример: *.setArtifact("***ci01072092_pprbod***").

      Если параметры по полигону отсутствуют, то при установке на этом полигоне применяется значение базового параметра mmtZones.

    • nexusGroupId — groupId фабрики в Nexus. Нужен КАП для простановки в пространстве QGM источника флагов: pprbod_ift_ready и pprbod_ift_valid.

    • offDQRequestSenderFactoryMode — режим работы offline-ТКД 06. Методика анализа расхождений оффлайн ТКД (один из вариантов):

      • SYNC_WITH_SAMPLES (синхронная передача данных с поддержкой сэмплов). Используют источники на БПС Сделке, Маппере, и источники на универсальном векторе изменений (транспортный формат 4.0) или DTO классах, реализующих Синхронный API Init V2 из инструкции: API ТС Архивирование Archiving V4.

      • KAFKA_WITH_SAMPLES (передача данных с поддержкой примеров, поверх Kafka). Используют источники 4 поколения платформы, взаимодействующие с ТС Архивирование через Kafka (без использования ММТ), использующие для интеграции с ТС Архивирование Маппер 4 поколения, транспортный формат 4.0, или Dataspace.

      • SYNC (синхронная передача данных). Используют источники, реализующие Синхронный API Init V1 из инструкции: «API ТС Архивирование Archiving V4».

    • nonRootTypeHandlingMode — параметр, указывающий необходимость конвертации запрошенного типа в корневой (иными словами, отдает ли источник не только корневые, но и дочерние типы). В случае, когда источник отдает и рутовые, и дочерние типы, параметр устанавливается в значение AS_IS. Если же отдаваемые типы - только рутовые, то значение параметра устанавливается как SEND_ROOT.

    • initiator — эквивалентно содержимому поля initiator заголовка журнала ПЖ, то есть названию модуля, который отправляет данные в Прикладной журнал.

    • capWaitCommit — true или false - необязательный параметр, ожидать ли подтверждение на доставку сообщений от КАП. По умолчанию значение false.

    • deserializerName — название класса реализации интерфейса модели плагина PprbodSourcePlugin.

      Для источников на Маппере com.sbt.pprbod.artifacts.mapper.MapperPprbodSourcePlugin.

      На БПС.Сделке com.sbt.pprbod.artifacts.cdm.CdmPprbodSourcePlugin.

      Для источников на универсальном векторе ПЖ (транспортный формат 4.0) и DataSpace com.sbt.pprbod.artifacts.mapper.MapperJsonPprbodSourcePlugin.

      Для остальных источников значение параметра равно полному именни класса реализации интерфейса модели плагина PprbodSourcePlugin, например com.sbt.pprbod.artifacts.gbk.GbkPprbodSourcePlugin.

    • modelProviderType - тип провайдера модели определяется исходя из типа интеграции с Archiving:

      • MAPPER - источники использующие Маппер, транспортный формат 4.0 или Dataspace;

      • CDM - источники использующие БПС Сделки (mega-bas);

      • ANNOTATION - источники использующие собственную реализацию модели, размеченную аннотациями Archiving;

      • CUSTOM - источники использующие собственную реализацию провайдера модели, интегрирующие ее в Archiving через плагин.

    Параметры для интеграции с АС МЕТА:

    • componentCode — код компонента (код АС,ФП, Сервиса), задается при генерации или загрузке модели для МЕТА на стороне АС Источника, например, "APP712".

      Параметр является обязательным, если не предоставлен arisGUID( ранее componentGUID). Строго обязательный при работе со старой META.HUB (до ввода в работу Инфоактивы - текущий вариант)

    • arisGUID - (ранее был componentGUID) — ARIS GUID (код АС,ФП, Сервиса), задается при генерации или загрузке модели для МЕТА из ARIS на стороне АС Источника, например dfeb18a1-e62e-11e7-710f-fa163edfb21e.

      Параметр является обязательным, если не задан componentCode.

      Требования по работе с АС МЕТА

      Логика вычисления:

      Если в параметрах функции configArhive.create(modelPath) передается путь к файлу с метамоделью и в параметре whiteListPath передан путь к файлу белого списка («B-Pipeline. Сборка конфигурации (конфигурационного архива) для Archiving»), Archiving возьмет для обработки эти файлы. Если же в этих параметрах передано значение NULL, то по параметру componentCode Archiving обращается в АС «МЕТА» и запрашивает метамодель и белый список. Если запрос не вернет метамодель, методы Archiving не выполнятся и дальнейшая работа будет невозможна!

      Если запрос успешен, то схемы АВРО будут генерироваться именно из полученной от МЕТА информации.

      Внимание!

      Для источников пилота и первой волны требование не обязательно, Archiving обработает метамодель и вайтлист и в виде файлов!

    • dataSampleThreshold (опциональный) — для ТКД число объектов, которое надо запросить у источника. Значение 0 выключено. При отсутствии параметра, по умолчанию берется "0".

    • initTopicMode (опциональный) — обозначает то, в какой топик выгружать данные в ходе инита (топик данных или отдельный топик инита). Допустимые значения: data, init.

      Если указано другое значение, фаза конфигурации выдаст ошибку.

      Когда initTopicMode отсутствует в sourceDescription.yml передается значение по умолчанию: data.

B-Pipeline. Сборка конфигурации (конфигурационного архива) для Archiving#

Внимание!

"Сборка конфигурации" предполагает дополнение сборочного архива источника файлами, предназначенными для настройки Archiving.

Внутренний состав сборки никак не затрагивается, собирается командой источника без изменения технологического процесса.

Функциональность по сборке конфигурации для Archiving реализована в виде Jenkins-библиотеки (далее по тексту — JSL).

Назначение функциональности: сборка артефактов конфигурации Archiving, необходимых для конфигурирования экземпляра Archiving на полигонах.

Функциональность (в виде JSL) предназначена для встраивания в сборочный Pipeline источника непосредственно после сборки артефактов самого источника, после того, как:

  1. Артефакты источника собраны.

  2. Плагином Archiving на этапе выполнения Maven-сборки сгенерирован файл дескриптора модели.

Важно

Файл дескриптора модели генерируется плагином только для источников, использующих в качестве DTO артефактов java-классы. Для источников, использующих Mapper или Avro, в качестве дескриптора должно передаваться XML-описание объектов Mapper, либо Avro-схема соответственно.

Функциональность сборки включает в себя:

  1. Работу со SW META, а именно:

    • Получение из SW META логической модели ЛМ АС.

      ТУЗ для МЕТА

      Для работы методов Archiving с ПРОМ МЕТА и получения вашей актуальной модели и белого списка необходимо наличие ТУЗа у каждого источника.

      ТУЗ в ПРОМ для МЕТА создается только по распоряжению, согласованному всеми участниками процесса согласования в СЭОДО.

      Заведите заявку в службу поддержки пользователей, чтобы назначить УЗСН в МЕТА.

      Роль ROLE_UDM_READER

      После получения ТУЗ добавьте ее в вашем пространстве Jenkins.Credentials c id: *meta-prom-creds.

      Внимание

      Для источников, использующих «коммунальные» подзадачи в пространстве Archiving, учетные данные уже определены и регистрировать собственную ТУЗ нет необходимости.

      При вызовах функции create() допускается использование как credentian id, так и формы "username:password".

      Важно

      ТУЗ для МЕТА не из AD. Этот только внутренний ТУЗ в АС «МЕТА».

    • Получение из SW META белых списков, размеченных источником в design-time.

    • Сопоставление логической модели ЛМ АС с физическим описанием данных (файлом дескриптора источника).

    • Формирование файла белого списка в установленном для Archiving формате на основании разметки ЛМ АС Whitelist из SW META.

  2. Работу по подготовке (структурировании в требуемом для Archiving формате) данных, поставляемых источником, и данных, полученных из ЛМ АС МЕТА.

  3. Работу по генерации AVRO — схем для Archiving для последующей доставки в КАП.

Данные, поставляемые непосредственно источником:

  • Файл дескриптора модели (физический). Файл может быть одного из трех вариантов:

    • Для источников, использующих классические java-классы (DTO) — файл дескриптора модели в формате JSON, генерируемый плагином Archiving в фазе сборки . Является выходным артефактом плагина. Имя файла соответствует шаблону .model.v..json.

    • Для источников, использующих Platform V Persistence — файл логической модели ЛМ АС в формате XML. Генерируется плагином eip-metamodel-scanner-maven-plugin. Является ресурсом источника.

    • Для источников на БПС — файл ЛМ АС в формате XML. Генерируется плагином generator-for-metamodel-plugin. Является ресурсом источника.

    • Файл белого списка. Это текстовый файл, опциональный в целевом варианте, т.к. в целевом варианте белый список извлекается задачей из ЛМ АС МЕТА. Обязательный для источника, который не имеет интеграции с ЛМ АС МЕТА. Шаблон имени файла: <имя_источника>.whitelist.v.<версия_белого_списка>, например, cdm.whitelist.v.0.0.1

      Примечание

      Передача ЛМ в виде файлов допустима для «старых» источников, в целях совместимости. Для всех вновь подключаемых источников целевой вариант — выгрузка ЛМ из МЕТА, выгрузку осуществит Archiving по указанному componentCode и списку версий моделей в параметрах вызова функции create().

      Это относится и к ДМ и к белому списку

  • Набор параметров конфигурирования Archiving.

Archiving реализует работу с АС МЕТА и КомпонентКод — переделка логики resolved.

Логика вычисления:

Если в параметрах функции configArchive.create(modelPath) передается путь к файлу с метамоделью и в параметре whiteListPath передан путь к файлу белого списка, Archiving возьмет для обработки эти файлы.

Если же в этих параметрах передан NULL, то по параметру componentCode Archiving обращается в АС МЕТА и запрашивает метамодель указанных в modelVersions версий и белый список.

Если запрос не вернет метамодель, методы Archiving не выполнятся и дальнейшая работа будет невозможна!

Если запрос успешен, то схемы АВРО будут генерироваться именно из полученной от МЕТА информации.

Внимание

Для источников пилота и первой волны требование не обязательно, Archiving обработает метамодель и белый список в виде файлов!

Подключение JSL сборки конфигурации Archiving#

Для сборки дистрибутива источника был разработан компонент библиотеки JSL.

Важно

Для источников, не интегрированных с МЕТА, необходимо, чтобы в рабочем пространстве сборочной задачи источника присутствовали файлы дескриптора модели и белого списка, имя которых задано согласно инструкции.

Результатом работы компонента библиотеки является появление следующих файлов в рабочем пространстве сборочной Jenkins-задачи, в которой подключена библиотека:

  • архив конфигурации (<имя источника>.model.<artifactVersion>.zip);

  • дескриптор версий version.info.yml;

  • AVRO-схемы для КАП.

Эти файлы должны быть помещены в папку /other/model дистрибутива источника.

Порядок подключения сборочной библиотеки#

  1. Зарегистрируйте библиотеку в Jenkins (если это еще не сделано).

  2. В groovy-скрипте сборочной Jenkins-задачи перед инициализацией узла добавьте строку:

    @Library('tsajlib') _
    

    При отсутствии доступа к библиотеке обратитесь к команде Archiving. Проверить доступность можно прямым обращением к системе версионирования.

  3. Добавьте вызов компонента библиотеки сборки архива

    configArchive.create(sourceDescriptionPath, modelPath, whiteListPath, artifactVersion, modelVersion, whiteListVersion,auditCredId )  **DEPRECATED - работа  с файлами ЛМ и БС
    

    либо

    configArchive.create(sourceDescriptionPath, modelPath, whiteListPath, artifactVersion, dataModelVersion , listVersion , LinkedHashMap dataModelVersions, auditCredId = "SBT-SA-TSPI001A"  ,  metaCredId = "meta-prom-creds" )
    

    В данном случае Archiving будет работать с данными, запрошенными из МЕТА ЛМ и БС, где:

    • sourceDescriptionPath — путь к файлу sourceDescription.yml в сборочной директории задачи, куда подключается компонент (пример: ${WORKSPACE}/tmp/sourceDescription.yml);

    • modelPath — путь к файлу дескриптора модели в сборочной директории задачи, куда подключается компонент (пример: ${WORKSPACE}/tmp/ucp.model.v.0.0.1.json). В случае интеграции источника с МЕТА должно передаваться значение null;

    • whiteListPath — путь к файлу белого списка в сборочной директории задачи, куда подключается компонент (пример: ${WORKSPACE}/tmp/ucp.whitelist.v.0.0.2). В случае интеграции источника с МЕТА должно передаваться значение null;

    • artifactVersion — версия дистрибутива (Пример: 4.000.01).

      Важно

      Это значение будет использоваться для выставления флага при дальнейших вызовах функций tsa_validate и tsa_configure.

    • dataModelVersion — устаревшая форма, версия метамодели в виде строки X.X.X.

      Внимание

      Не используйте эту форму для новых источников. Она оставлена для совместимости с РКО ФИ.

    • dataModelVersions — версия моделей в формате LinkedHashMap. Пример: ["spr":"4.8.3","base_model":"0.1.2.3"], при этом параметр modelVersion нужно установить в значение null.

    • whiteListVersion — версия белого списка.

    • auditCredId — учетные данные для аудита событий, согласно требованиям безопасности. Учетные данные должны существовать в проектной области, из которой запускается ваша подзадача.

      Если вы пользуетесь эталонными подзадачами Archiving, запускаемыми из пространства Archiving, то там сработает значение по умолчанию и параметр заполнять необязательно. Если вы используете методы Archiving в своих подзадачах в своем пространстве, то нужно определить этот параметр. Можно использовать те же учетные данные, под которыми вы обращаетесь в Нексус.

    • metaCredId — учетные данные для вычитывания метамодели из МЕТА. Учетные данные должны существовать в проектной области, из которой запускается ваша подзадача. По умолчанию используются учетные данные "meta-prom-creds". Они определены в проектной области Archiving, из которой запускаются "коммунальные" подзадачи. Вместо учетных данных допускается использовать форму "username:password".

2. Подключение компонента генерации AVRO-схем#

Важно

Необходимо, чтобы в рабочем пространстве сборочной задачи источника присутствовали файлы дескриптора модели и белого списка, названные согласно инструкции.

Результатом работы компонента библиотеки является появление следующих файлов в рабочем пространстве сборочной Jenkins-задачи, в которой подключена библиотека:

  • model_schemas_full.txt — содержит avro-схемы классов из дескриптора модели;

  • model_schemas.txt - содержит только те avro-схемы классов из дескриптора модели, которые присутствуют в белом списке. Если в архиве не будет файла белого списка, то в данном файле будет отображена ошибка, а сам файл в дальнейшем не пройдет валидацию.

В случае если в процессе генерации схем возникнет ошибка, ее стектрейс будет записан в выходные файлы, а сборка завершится неудачей.

Эти файлы должны быть помещены источником в папку /other/model дистрибутива источника.

Порядок подключения библиотеки генерации AVRO-схем

  1. В groovy-скрипте перед инициализацией узла добавьте следующую строку в сборочной Jenkins-задаче:

    @Library('tsajlib') _

  2. Добавить вызов компонента генерации схем для архива:

avroSchemas.generate(credId, sourceName, modelFilePath, verboseKey, whiteListFilePath,sdPath)

Описание параметров:

Параметр

Описание

Пример использования

credId

Идентификатор учетных данных, которые будут использоваться для скачивания файла, реализующего в себе логику генерации avro-схем

SBT-SA-TSPI001A

sourceName

Мнемоника источника

GBK

modelFilePath

Путь до файла, содержащего описание модели источника. Если не в текущем каталоге, то с путем;

Примечание: для источников БПС (modelType=**EIP_METAMODEL), в случае если в использовании только базовые типы и нет никаких расширений, в этом параметре укажите "UNUSE", так как для БПС Archiving уже прогрузил базовую модель и если дополнительно прогружать ее от отдельных источников, то получим Duplicate key (PPRBARCH-4030)

gbk-main.model.v.0.1.json

verboseKey

Признак формирования вербального ключа (использование префикса имени класса в идентификаторе). Имеет значения true/false (из раздела «Состав дистрибутива конфигурации источника»). Значение verboseKey присутствует в каждой avro-схеме в словаре ReferenceProps поля doc. Рекомендуется указывать false. В случае необходимости использования true - дополнительно проконсультируйтесь с командой Archiving

false

whiteListFilePath

Примечание: Если ранее вызывался методом configArchive.create() без указания имен файлов модели и вайтлиста, то файлы будут созданы автоматически с шаблонными именами, например, .whitelist.v.

cdm.whitelist.v.0.0.1

sdPath (Опциональный)

Отноcительный (от WORKSPACE) путь до файла SourceDescription.yml (указывается, если zip диcтрибутива содержит много модулей и каждый каталог содержит свой SourceDescription.yml)

sber-crm-deals-journal/

Примечание

Генерация схем производится внешним кодом "schema-generator-from-metamodel-0.0.XX.jar"

R-Pipeline. ИФТ. Валидация дистрибутива источника#

Функциональность валидации дистрибутива источника реализована в виде Jenkins-библиотеки (далее по тексту - JSL). Валидатор можно запускать как в виде этапа DPM, так и при сборке дистрибутива непосредственно в сборочных Job.

Внимание:

Валидация дистрибутива не вносит никаких изменений в сборку.

Назначение функциональности:

  • Валидация дистрибутива источника в части наличия в дистрибутиве источника артефакта с параметрами конфигурации Archiving.

  • Проверка корректности параметров и значений в этом артефакте.

Критерии применимости дистрибутива источника:

  1. Наличие необходимых файлов (белого списка, дескриптора модели (либо файла/файлов ЛМ АС), файла параметров источника, дескриптора версий, avro-схем для КАП)

  2. Валидность имеющихся файлов (форматный контроль):

    • Файлы схемы данных (дескриптора модели Archiving) валидируются чтением, по JSON-схеме и накачкой (должны быть валидны).

    • Файлы схемы данных КАП (model_schemas) проверяются на наличие и валидируются на корректность содержимого попыткой прочтения.

    • Белые списки проверяются по формату согласно спецификации.

    • Файл конфигурации YAML проверяется на существование, корректность и наличие всех требуемых параметров (существуют и не пустые).

  3. Пригодность файлов для реальной работы (применяются специально написанные на Java артефакты валидации); для дескриптора модели проверяется:

    • соответствие формата типу логической модели (ЛМ АС SW META) в YAML-файле и возможность объективно сгенерировать классы;

    • соответствие сгенерированных классов имеющемуся белому списку в поставке.

Фаза валидации необходима для того, чтобы выполнять проверку дистрибутива источника, прежде чем выполнять попытку конфигурирования с использованием параметров из дистрибутива, на реальном экземпляре Archiving на полигоне.

В зависимости от результата проверки проставляются флаг QG pprbod.ift (статусы ok_need_config либо ok_skip_config) в пространстве сервиса QualityGateManager.

В фазу настройки Archiving R-Pipeline ИФТ Фаза конфигурирования инстанса Archiving встраивается проверка наличия флага pprbod.ift = ok_skip_config, и при отсутствии соответствующего флага фаза выполнена успешно не будет. Помимо этого, DPM конвейр должен быть настроен на обязательное наличие флага успешной проверки дистрибутива для перехода на дальнейшие стадии.

Подключение JSL валидации дистрибутива конфигурации Archiving#

Внимание:

При работе на этапах сборки (не в DPM) необходимо, чтобы к моменту вызова метода валидации уже имелся собранный архив дистрибутива источника. Если работа идет в составе Job сборки и дистрибутив уже расположен в воркспейсе сборочной Job, то можно оптимизировать процесс, исключив скачивание и распаковку дистрибутива.

Подключение библиотеки валидации#

Перед началом работы необходимо зарегистрировать библиотеку в Jenkins (если это еще не сделано).

Подключение библиотеки для тестовой разработки#

На этапе разработки и тестовых запусков, а также пока не оформлены аккаунты для обращения к СУДИР, можно воспользоваться отладочной веткой, не выставляющей флаги в QGM и не выполняющей соединения с СУДИР, а просто логирующей информацию о флагах.

Для создания тестовой ветки подключите библиотеку:

@Library('tsajlib@test_branch_without_flag') _

При вызовах валидатора можно воспользоваться упрощенной формой, опустив часть параметров (будут использованы значения по умолчанию). Пример:

distributive.tsa_validate(sourceName, polygonName, distributivePath, artifactVersion, nexusArtifactId, tuz)

На этапе обращения к СУДИР и HTTP Bridge в тестовой ветке для проверок версий будет выводиться ошибка:

❖ ===INFO: Getting CSRF token...                                 
...                                                                    
❖ ===ERROR: =======Ошибка обращения к API HttpBridge           
...                                                       
java.lang.NullPointerException: Cannot invoke method getAt() on null object

Если в логах ваших Job появится данный текст - значит все, что можно получить с тестового бранча, вы уже получили, основные этапы валидации конфигурации пройдены, и пора возвращаться к основной ветке master tsajlib библиотеки.

Целевой вариант подключение библиотеки#
@Library('tsajlib') _

Вызов компонента библиотеки валидации дистрибутива#

Добавить вызов:

distributive.tsa_validate(sourceName, polygonName, distrPath, artifactVersion, nexusArtifactId, tuz, sudirUserId, String nexusGroupId = 'Nexus_PROD')

где:

Параметр

Описание

sourceName

Мнемонический идентификатор источника

distrPath

Путь к архиву дистрибутива источника. Это именно PATH (не URL), то есть при вызове tsa_validate вы должны обеспечить наличие файла в пределах файловой системы.

artifactVersion

Версия дистрибутива источника. Важно: значение должно совпадать с тем, что использовалось при вызове configArchive.create() при сборке дистрибутива, так как это значение требуется для выставления флага.

nexusArtifactId

Идентификатор артефакта источника в Нексусе, используется только для простановки флага pprbod в QGM в пространстве источника.

polygonName

Идентификатор полигона, на который устанавливается дистрибутив (сейчас доступны варианты IFT_RB и IFT_KB, также допускается использование имен DENON и BOSTON cоответственно именам стендов.

tuz

id технологической учетной записи источника для возможности выставления флагов из пространства источника (креды должны быть добавлены в хранилище Jenkins и иметь тип "Username with password"). Также допускается использование строки вида "username:password".

sudirUserId

id учетных данных УЗ источника для авторизации в СУДИР (креды должны быть добавлены в хранилище Jenkins). Также допускается использование строки вида "username:password". Данная учетная запись должна иметь права на выполнение HTTP-запросов к HttpBridge. Для этого необходимо добавить ей такие права, оформив заявку согласно инструкции.

nexusGroupId

groupId фабрики в Nexus. Нужен КАПу для простановки в пространстве QGM источника флагов pprbod_ift_ready и pprbod_ift_valid. Значение по умолчанию: Nexus_PROD.

Примечание:

Для параметров tuz и sudirUserId допускается возможность вводить либо креденшн либо пару "user:password". Признаком того, что используется метод авторизации "user:password", является наличие в параметре символа ':', соответственно в пароле не должен использоваться этот символ (':')

Правильный nexusGropuId можно взять из ваших сборочных конвейеров DPM, зайти в свой проект, нажать "шестеренка" в правом верхнем углу и "Редактировать сервис", на открывшейся странице выписать поле:

Пример тестовой Jenkins Job для запуска валидатора:

Можно работать непосредственно с этой Job в пространстве Archiving, либо можно взять ее за образец и перенести код в свое пространство:

Внимание:

Базовая ветка библиотеки - tsajlib@master. При сбоях работы есть возможность выбрать другую ветку - в параметрах запуска Job выберете "запасную" ветку tsajlb@ift_hotfix (параметр параметр TCA_BRANCH):

Результат работы валидатора#

Итогом работы компонента валидации дистрибутива источника является появление в пространстве QGM:

  1. Промежуточные флаги, результат валидации. Возможные значения флага pprbod.ift:

    • ok_miss_avro (валидация архива конфигурации Archiving прошла успешно, однако отсутствуют либо невалидны AVRO-схемы для КАП);

    • err (архив конфигурации Archiving невалиден).

  2. Cтатусы флага pprbod.ift успешной валидации определяют необходимость дальнейшего конфигурирования источника в Archiving. Возможные значения:

    • ok_need_config (переконфигурирование источника необходимо, поскольку изменились ключевые параметры конфигурации);

    • ok_skip_config (нет необходимости дальнейшего конфигурирования источника, т.к. ключевые параметры конфигурации не изменились, в тело флага (body) вписана строка = номер ранее установленной версии вида "D-XX.XXX.XX").

Возможные ошибки валидации#

При валидации белого списка возможны ошибки, cвязанные с некорректным синтаксисом. Библиотека имеет возможность построчного анализа белого списка. При этом в лог выводится каждая строка белого списка и результат проверки этой строки - true или false. true - строка валидна, false - строка с нарушением структуры.

Для включения данной возможности подключайте ветку библиотеки:

@Library('tsajlib@white_list_scan')

R-Pipeline. ИФТ. Фаза конфигурирования инстанса Archiving#

Внимание:

По инструкции tsa_configure, получив флаг pprbod.ift=ok_skip_config, просто завершает работу. Сейчас действует временный порядок: tsa_configure, получив флаг pprbod.ift=ok_skip_config, ничего не делая, выставляет флаг pprbod.ift=ok. После доработок source-provider данный режим будет выключен и восстановлен в соответствии с инструкцией.

Функциональность конфигурирования Archiving на основе дистрибутива источника реализована в виде Jenkins-библиотеки (далее по тексту - JSL).

Внимание:

Прикладные фабрики, которые не используют для интеграции с Archiving DataSpace, БПС.Сделки, Mapper или Универсальный вектор ПЖ, и для обработки данных требуют добавления плагина десериализации данных в состав Archiving, выполняют данный шаг только после подтверждения факта выхода релиза Archiving, содержащего в своей кодовой базе плагин десериализации данных, на контур ИФТ. |

Внимание:

Конфигурирование не вносит никаких изменений в сборку и не выполняет инсталляцию/развертывание модулей. Основной технологический процесс развертывания дистрибутива, кроме включения в задание развертывания команды для вызова конфигурирования, не затрагивается.

Назначение функциональности: конфигурирование Archiving для работы с источником выпускаемой версии. Конфигурирование выполняется на основе артефакта, включенного в дистрибутив источника. Задача конфигурирования выполняется только в том случае, если выставлен флаг Quality Gate pprbod_ok_need_config. Наличие такого флага означает, что:

  1. Дистрибутив успешно прошел валидацию.

  2. Содержание дистрибутива отличается от текущей конфигурации источника (либо текущая конфигурация отсутствует, т.е. источник впервые подключается к Archiving).

Конфигурирования экземпляра Archiving#

Для того чтобы встроить задачу конфигурирования экземпляра Archiving, необходимо выполнить следующие действия:

Перед началом работы зарегистрируйте библиотеку в Jenkins (если это еще не сделано).

Подключение библиотеки для тестовой разработки#

На этапе разработки и тестовых запусков, а также пока не оформлены аккаунты для обращения к СУДИР, можно воспользоваться веткой, не выставляющей флаги в QGM, а просто логирующей информацию о флагах.

Для подключения тестовой ветки подключите библиотеку:

@Library('tsajlib@test_branch_without_flag') _

Целевой вариант подключения библиотеки#

@Library('tsajlib') _

Вызов компонента библиотеки JSL конфигурирования дистрибутива осуществляется следующим образом:

distributive.tsa_configure(sourceName, polygonName, distrArchivePath, tuz, sudirUserId, nexusGroupId) |

где:

Параметр

Описание

sourceName

Мнемонический идентификатор источника.

distrArchivePath

Путь к архиву дистрибутива источника. Важно: для предотвращения повторного скачивания и распаковки дистрибутива (например, если метод конфигурирования вызывается в том же Pipeline, что и метод валидации, и после валидации распакованное содержимое архива не удалялось) то в качестве значения distrArchivePath должна быть передана строковая константа UNPACKED. Если же метод конфигурирования вызывается не в том же Pipeline, что метод валидации, то в качестве значения этого параметра передается путь к архиву дистрибутива источника (например, "${WORKSPACE}/gbk-main.1.0.0.zip").

polygonName

Идентификатор полигона, на который устанавливается дистрибутив (сейчас доступны варианты IFT_RB и IFT_KB, также допускается использование имен DENON и BOSTON cоответственно именам стендов.

tuz

id технологической учетной записи источника для возможности выставления флагов из пространства источника (креды должны быть добавлены в хранилище Jenkins и иметь тип "Username with password" (таково требование со стороны QGM)), допускается передача в виде cтроки "username:password".

sudirUserId

id учетных данных УЗ источника для авторизации в СУДИР (учетные данные должны быть добавлены в хранилище Jenkins).

nexusGroupId

groupId фабрики в Nexus. Нужен КАП для простановки в пространстве QGM источника флагов.

Правильный nexusGropuId можно взять из ваших сборочных конвейеров DPM, зайти в свой проект, нажать "шестеренка" в правом верхнем углу и "Редактировать сервис", на открывшейся странице выписать поле:

Пример jenkins Job для использования в тестировании и в качестве настройки DPM:

Внимание:

Базовая ветка библиотеки - tsajlib@master. При сбоях работы есть возможность выбрать другую ветку - в параметрах запуска Job можно выбрать "запасную" ветку tsajlb@ift_hotfix (параметр TCA_BRANCH):

Итогом работы компонента является появление в пространстве QGM флага, определяющего результат.

Возможные значения:

Флаг

Cтатус флага

Описание

pprbod.ift

ok

Конфигурирование прошло полностью успешно, т.е. в дистрибутиве присутствуют и валидны файлы, необходимые и Archiving, и КАП).

pprbod.ift

err

Конфигурирование Archiving закончилось неудачей.

Конфигурация источника сохранена в БД Archiving для того, чтобы сервер начал обрабатывать поток данных. Сервис Archiving нужно перезапускать.

7. R-Pipeline. ИФТ. Smoke-Regress тесты#

Назначение задачи - проведение Smoke-regress автоматизированных тестов с целью проверки корректности работы источника на Archiving после применения обновленной модели источника и настроек. Задача тестов - проверить обратную совместимость, работоспособность репликации после обновления модели.

Данные тесты не являются предметом согласования с КАП и реализуются по методикам автотестов, разрабатываемым Archiving для задач regress тестирования в режиме полной автоматизации.

Для реализации задачи разрабатывается унифицированный тестовый Pipeline, который может быть запущен на любом полигоне ИФТ. Паплайн пока находится в стадии активной разработки.

Запуск Smoke-regress тестов выполняется до публикации avro-схем в КАП.

В случае ошибки тестов схемы в КАП не публикуются, подготовка дистрибутива КАП не инициируется.

Обязательным условием дальнейшего движения дистрибутива источника далее по конвейеру является успешное прохождение фазы Smoke-Regress тестирования.

По результату тестирования проставляется Quality Gate флаг pprbod.smoke.regress.

Флаг может принимать следущие статусы:

Статус флага

Описание

ok_smoke_regress

Успешно пройдены тесты озволяет дистрибутиву источника двигаться дальше по конвейру и устанавливаться на следующий полигон, в случае полигона ИФТ - на полигон ПСИ, в случае полигона ПСИ - допускает вывод в ПРОМ. Отсутствие такого флага не препятствует установке дистрибутива источника на полигон ИФТ, но запрещает его дальнейшую установку на полигоны и в ПРОМ.

err_smoke_regress

Тесты полностью или частично провалены.

Работа с клиентской библиотекой Archiving#

Ограничения, накладываемые на реализацию источником API Archiving#

  • От источника API оффлайн ТКД должен реализовывать только один модуль.

  • Допускается, что этот модуль может иметь несколько шард (в 3 поколении - иметь несколько экземпляров в разных зонах ММТ), в этом случае Archiving последовательно опрашивает все шарды при оффлайн ТКД. Список зон ММТ, в которых эти шарды расположены, в этом случае предварительно конфигурируется при помощи DevOps-конвеера (как при конфигурировании передавать в Archiving данные о своем модуле и в каких зонах он расположен описано в статье Состав дистрибутива конфигурации источника).

API Init (целевой)#

Базовые принципы построения целевого API Init:

  • Передача данных по определенному типу.

  • Передача данных партициями, размер партиции определяется динамически (фабрикой).

  • Передача данных выполняется в объеме всей первоначальной выгрузки потоковым образом с разбиением на партиции со стороны источника.

  • Для передачи партиции источник разбивает ее на пакеты (батчи) и отправляет эти пакеты в Archiving.

  • Для передачи одной партиции используется один асинхронный со стороны Archiving и необходимое количество синхронных вызовов API загрузки пакета (части партиции) со стороны источника.

  • Обработка последующих партиций определяется результатом (кодом ответа фабрики) на запрос получения данных текущей партиции.

Ключевое - это схема импорта потоком, с фиксацией последнего полученного от ПФ при обработке данных идентификатора объекта и инициировании запроса на следующую партицию с этого идентификатора.

Синхронный API Init V1 (Устаревшее API для источников, использующих 3-ю версию Archiving)#

@Api
public interface InitDataTransportApi {
@ApiMethod(apiName = "initLoad", version = "0.0.1")
String initLoad(String type);
@ApiMethod(apiName = "getBatchCount", version = "0.0.1")
EstimateResult getBatchCount(String loadingId) throws BatchEstimateException;
@ApiMethod(apiName = "loadBatch", version = "0.0.1")
LoadResult loadBatch(String loadingId, int index) throws BatchLoadException;
@ApiMethod(apiName = "abort", version = "0.0.1")
void abort(String loadingId);
}

Описание методов InitDataTransportApi#

Список методов:

  • initLoad - начать выгрузку по типу type, вернется идентификатор выгрузки;

  • getBatchCount - получить количество  партиций для указанного типа. loadingId - идентификатор выгрузки, полученный в методе initLoad;

  • loadBatch - получить парицию с индексом index для идентификатора выгрузки loadingId. Предполагается итерирование и вызов этого метода для индексов от нуля до количества партиций, полученного в getBatchCount. Результат - набор записей, относящийся к получаемой партиции;

  • abort - уведомить ПФ, что Archiving больше не будет запрашивать партиции и можно освободить ресурсы, занятые для подготовки данных для выгрузки с идентификатором loadingId.

Синхронный API Init V2 (Целевое API для источников, использующих 4-ую версию Archiving)#

@Api                                           
public interface InitDataSampleApi {                      
@ApiMethod(apiName = "initLoad", version = "0.0.1")            
String initLoad(String type);                                          
@ApiMethod(apiName = "getBatchCount", version = "0.0.1")                     
EstimateResult getBatchCount(String loadingId) throws BatchEstimateException;         
@ApiMethod(apiName = "loadBatchAsync", version = "0.0.1")                              
void loadBatchAsync(String loadingId, int index, String requestId) throws BatchLoadException;
@ApiMethod(apiName = "abort", version = "0.0.1")                                     
void abort(String loadingId);                                                       
}                                                                               
@Api                                                                           
public interface InitDataSampleLoad {                                             
@ApiMethod(apiName = "loadInitBatch", version = "0.0.1")                         
void loadInitBatch(InitBatchResult initBatchResult) throws PartitionResultAcceptException;
}                                                                            

Методы InitDataSampleApi (реализуется на стороне источника)#

Описание методов:

  • initLoad - начинает выгрузку по типу type, возвращает идентификатор выгрузки;

  • getBatchCount - возвращает количество партиций для выгрузки, начатой вызовом initLoad();

  • loadBatchAsync - инициирует отправку партиции с индексом index для идентификатора выгрузки loadingId. Предполагается итерирование и вызов этого метода для индексов от нуля до количества партиций, возвращенного getBatchCount(). Параметр requestId имеет смысл глобально уникального идентификатора, значение должно коррелировать с InitBatchResult#requestId в последующем вызове InitDataSampleLoad.loadInitBatch(InitBatchResult) на стороне Archiving;

  • abort - освобождает все ресурсы, необходимые для отправки данных, относящихся к выгрузке с идентификатором loadingId. Выполнив вызов abort(), Archiving больше не планирует получать пакеты этой выгрузки.

Описание методов InitDataSampleLoad (реализуется на стороне Archiving)#

  • loadInitBatch - загружает пакет, отправленный источником; отправка должна быть ранее успешно инициирована вызовом InitDataSampleApi.loadBatchAsync() на стороне источника.

Оценка#

/**                                                         
* Результат оценки размера партиции                             
*/                                                              
public class EstimateResult implements Serializable {        
private static final long serialVersionUID = -5906616942515441183L;
private final PartitionEstimateCode estimateCode;     
private final Integer size;                             
}                                                      
/**                                                        
* Возможность оценки                           
*/                                           
public enum PartitionEstimateCode {                    
/**                                              
* ПФ не умеет оценивать по списку ключей размер партиции
* (используется только при ТКД, в Init недопустимо возвращать)
*/                                                   
PS_DEFAULT,                                     
/**                                                    
* ПФ оценила размер портиции и подготовила данные для выгрузки
*/                                                 
PS_ADAPTIVE_READY,                                 
/**                                                 
* ПФ подготавливает данные                      
*/                                           
PS_ADAPTIVE_PENDING;                       
}                                                  
/**                                        
* Базовый класс для загрузки пакета данных со стороны источника
*/                                                     
public abstract class PartitionResult<T extends Serializable> implements Serializable {
private static final long serialVersionUID = -5280480427310163519L;
private final long responseId; // ID пакета (части ответа)       
private final String requestId; // ID запроса с которым коррелирует ответ  
private final Timestamp responseTimestamp; // Время формирования пакета
private final int partTotal; // Количество пакетов, на которые разделен ответ (N)
private final int partCurrent; // Номер текущего пакета (0..N-1)
private final String partHash; // Хеш текущей части ответа        
private final String entryType; // Тип объекта                  
private final String zoneId; // ММТ зона иcточника откуда отправлены объекты  
private final T data; // Фрагмент контейнера LoadResult - полезная нагрузка                                
private final String hash; // Хеш всего образца для проверки на стороне приемника после получения всех партиций
...                                                                                                 
}                                                                                                
public class InitBatchResult extends PartitionResult<byte[]> {                                  
private static final long serialVersionUID = -1110331909003562897L;                                    
public InitBatchResult(long responseId, String requestId, Timestamp responseTimestamp,                      
int partTotal, int partCurrent, String partHash, String entryType, String zoneId,                             
byte[] data, String hash) {                                                                                   
super(responseId, requestId, responseTimestamp, partTotal, partCurrent, partHash, entryType, zoneId, data, hash);
}                                                                                                        
}                                                                                                 

Передача данных#

/**                                
* Результат загрузки партиции                               
*/                                                          
public class LoadResult implements Serializable {            
private static final long serialVersionUID = 7608272960976480122L;
private final ResultCode code;                            
private final List<DataContainer> dataContainers = new ArrayList<>();
}                                                    
/                                                              
* Контейнер данных                                                
*/                                                                  
public class DataContainer implements Serializable {             
private static final long serialVersionUID = 3239401317382698849L;
private String key; //id объекта ПФ                      
private String entryType; // тип объекта                         
private Long version; // версия модели данных объекта             
private OperationType operType; // тип операции(для ТКД - create)  
private List<String> updAttrs; // список обновляемых аттрибутов  
private byte[] data; //сериализованый объект ПФ                  
}                                                            
/                                                             
* Тип операции в случае векторов изменений                
*/                                                            
public enum OperationType {                            
CREATE, UPDATE, DELETE;                                
}                                                            

Процесс offline ТКД с использованием генерации образцов на стороне источника#

Offline ТКД с генерацией образцов на стороне источника - процесс, позволяющий выявлять данные источника, которые в силу каких-либо обстоятельств не попали в КАП. Механизм позволяет выполнять двухстороннюю сверку и выявлять данные, как отличающиеся, неактуальные (отсутствующие в источнике), так и отсутствующие в КАП.

Терминология#

Список терминов:

  • Перечень типов - список типов, по которым со стороны КАП выполняется процедура сверки источника и реплики.

  • Дата загрузки - бизнес-дата, по которой производится сверка.

  • Образец идентификаторов - перечень идентификаторов, сформированный источником по одному конкретному запрошенному типу на запрошенную дату загрузки (loading_date). В образец идентификаторов включаться должны фабрикой-источником только объекты, существующие на эту дату.

  • Образец источника - перечень объектов, сформированный источником по образцу идентификаторов.

Все сообщения протокола, использующие криптографический хеш, подразумевают использование алгоритма sha256. Если не оговорено иное - хеш-сумма рассчитывается от всего контейнера полностью.

API КАП для offline ТКД с построением образца на стороне источника#

Взаимодействие с КАП выполняется через интеграционную Kafka Archiving, для обмена информацией используется топики:

  • _offdq - для запросов к Archiving. В топик со стороны КАП должны помещаться сообщения типа PprbOffdqKeyContainer - запрос на получение полных версий объектов по списку идентификаторов.

  • _offdq_response - топик ответов Archiving. В него со стороны Archiving должны помещаться сообщения типа PprbTransportContainer - универсальный контейнер, содержащий объекты ПФ.

Типы сообщений PprbTransportContainer не меняются. PprbTransportContainer дополняется опциональной зоной, которая заполняется в случае, если передаваемый идентификатор ранее был получен от источника в эту итерацию ТКД по семплу.

Формат сообщения PprbOffDqKeyContainer - дополняется идентификатором зоны

Транспортный контейнер ТКД запроса:

{
"name": "PprbOffDqKeyContainer",                                                                                                                           
"namespace": "com.sbt.pprbod.avro.journal.classes",                                                                                                       
"type": "record",                                                                                                                                         
{"name": "key", "type": {"type": "array", "items": "string"} }, // Массив ключей, по которым необходимо получить полные версии объектов из системы-источника
{"name": "entry_type", "type": "string"}, // Тип объекта                                                                                               
{"name": "global_type", "type": "string"} // Глобальный тип. Является атрибутом транспортного контейнера с собщением (не заполняется, нужно будет удалить)
{"name": "zone_id", "type": ["null", "string"]} // Идентификатор зоны. Заполняется в случае если идентификатор получен из образца источника и таким образом известен
}

Формат сообщения об ошибке PprbErrorContainer

Транспортный контейнер сообщений ошибок:

{
"name": "PprbErrorContainer",                                                                                                                                                        
"namespace": "com.sbt.pprbod.avro.journal.classes",                                                                                                                           
"type": "record",                                                                                                                                                                    
"fields":
[                                                                                      
{"name": "data_type", "type": "string" }, // "PprbData_1.0"                                                                                                               
{"name": "message_id", "type": "string"}, // Id сообщения, при обработке которого возникло исключение                                                                          
{"name": "interaction_type", "type": "string"}, // тип взаимодействия при котором случилась ошибка: STREAM - поток, DQ - Решение инцидента качества, QFFDQ - Решение запроса ТКД
{"name": "exception_code", "type": "string"}, // Код ошибки обработки контейнера (по классифиактору Archiving)                                                                       
{"name": "exception_timestamp", "type": "long"}, // время когда ошибка зафиксирована                                                                                 
{"name": "exception_trace", "type": ["null","string"]}, // Опциональный стектрейс                                                                                           
{"name": "exception_message", "type": ["null","string"]}, // Опциональное инфосообщение                                                                                      
{"name": "id_sample", "type":{ // Массив идентификаторов либо идентифиактор, обработка которых не удалась  
"type": "array",
"items": "string"
}                                                                                                                                                                            

API КАП для онлайн ТКД#

Взаимодействие с КАП выполняется через интеграционную Kafka Archiving, для обмена информацией используется топики:

  • _dq - для запросов  к Archiving. В топик со стороны КАП могут помещаться сообщения типа PprbKeyContainer - запрос на получение полной версии объекта по списку по типу идентификатору и зоне.

  • _dq_response - топик ответов Archiving. В него со стороны Archiving могут помещаться сообщения типа PprbTransportContainer - универсальный контейнер, содержащий объекты ПФ.

Типы сообщений PprbTransportContainer не меняются.

Формат сообщения PprbKeyContainer (запрос онлайн ТКД)

Транспортный контейнер ТКД запроса:

{
"name" : "PprbKeyContainer" ,
"namespace" : "com.sbt.pprbod.avro.journal.classes",
"type" : "record" ,
"fields" :[                                                                                                                                                                            { "name" : "key" , "type" : "string" }, // Ключ объекта, по которому зарегистрирован инцидент ТКД
{ "name" : "entry_type" , "type" : "string" }, // Тип объекта  
{ "name" : "zone_id" , "type" : "string" }, // Идентификатор зоны Stand-In ZoneID. Является атрибутом транспортного контейнера с собщением. Для источников с репликацией через OGG в данном атрибуте содержится "Global name" инстанса
]
}
}  

API Прикладной фабрики для оффлайн ТКД#

API 3 поколения#

Api 3 поколения - спецификация Java API для процесса оффлайн ТКД  для использования с ММТ. Является расширением и дополнением существующего API офлайн ТКД, но с поддержкой возможности дробления ответа ПФ на партиции.

Соглашения о сериализации#

Рекомендуется использование kryo версии 4.0.2 и класс com.sbt.pprbod.data.utils.KryoUtils, в котором уже сделаны все необходимые настройки.

В случае ограничений, делающих невозможным использование утилит Archiving для сериализации возможно использование собственного формата сериализации путем реализации интерфейса com.sbt.pprbod.common.api.TkdObjectDeserializer.

Примечание:

При реализации собственного десериализатора и использовании в нем Kryo необходимо отключать регистрацию классов, т.е не использовать com.esotericsoftware.kryo.Kryo#register(java.lang.Class), т.к. порядок обхода при регистрации на стороне источника и на стороне Archiving не детерменирован и в случае отличия это приводит к ошибкам сериализации.

Если для отправки журналов в ПЖ используется универсальный вектор ПЖ, то формировать объекты для Init и ткд также необходимо в формате универсального вектора, преобразованного в массив байт.

Соглашения о версионировании объектов#

ПФ ведет уникальное на своей стороне версионирование объектов. В случае изменения структуры (модели данных) объекта - должна монотонно увеличиваться и его версия.  Версионирование ведется в разрезе каждого типа объекта.

Если ПФ не поддерживает ведение версионирования модели данных, в качестве версии модели данных всегда передается 0.

Важно. Ведение версии модели данных может сказаться на возможности Archiving понять какую схему преобразования следует использовать, что для КАП в дальнейшем может помешать правильно интерпретировать полученные данные, а в Archiving - повлечь невозможность их правильной сериализации в плоскую модель.

Типы данных и интерфейсы для получения образца идентификаторов#

Структуры для запроса образца идентификаторов:

public abstract class PartitionResult<T extends Serializable> implements Serializable {
private static final long serialVersionUID = -5280480427310163519L;                    
private final long responseId; // ID партиции ответа                                         
private final String requestId; // ID запроса с которым коррелирует ответ                     
private final Timestamp responseTimestamp; // Время формирования части ответа                         
private final int partTotal; // Количество партиций на которые разделен ответ (N)                 
private final int partCurrent; // Номер текущей партиции (0..N-1)                                     
private final String partHash; // Хеш текущей части семпла для проверки на стороне приемника             
private final String entryType; // Тип объекта                                                             
private final String zoneId; // ММТ зона иcточника откуда отправлены объекты                     
private final T data; // Фрагмент контейнера LoadResult - полезная нагрузка                     
private final String hash; // Хеш всего образца для проверки на стороне приемника после получения всех партиций
...                                                                                               
}                                                                                                   
public class SampleResult extends PartitionResult<ArrayList<String>> {                   
private static final long serialVersionUID = 370903970994546191L;                                 
protected SampleResult(long responseId, String requestId, Timestamp responseTimestamp,               
int partTotal, int partCurrent, String partHash, String entryType, String zoneId,                    
ArrayList<String> data, String hash) {                                                          
super(responseId, requestId, responseTimestamp, partTotal, partCurrent, partHash, entryType, zoneId, data, hash);
}                                                                                                  
}                                                                                                 

Отличается способ использования транспортного контейнера. Если в исходном API ТКД (синхронный) источник реализовывал на своей стороне интерфейс public interface QualityDataTransportApi, в текущем API применяется иной подход:

  1. Для получения образца идентификаторов источник реализует на своей стороне API public interface QualityDataSampleApi, а именно - метод getQualitySample который должен на стороне источника построить срез в виде списка идентификаторов и подготовить ответ, вернув соответствующий код.

  2. Ответ в формате потока семпла идентификаторов источник отправляет на API TCA public interface QualityDataSampleLoad, а именно - метод loadQualitySample - потоковый, принимающий одну и более партицию семпла идентификаторов в количестве partTotal.

Устанавливается таймаут, в течение которого Archiving ожидает как получение порции данных семпла после вызова getQualityDataSample, так и получение следующей партиции при ее наличии. По умолчанию таймаут составляет 10 минут.

Интерфейс и методы для формирования списка идентификаторов по типу

Интерфейс для получения списка идентификаторов по типу:

public interface QualityDataSampleApi {
...
@ApiMethod(apiName = "getQualitySample", version = "0.0.1")
void getQualitySample(String type, String requestId, int sampleThreshold) throws QualityDataSampleException;

Интерфейс QualityDataSampleApi реализуется источником.

Интерфейс и методы для получения потока идентификаторов по типу (образца)

Интерфейс для получения списка идентификаторов по типу:

**public** **interface** QualityDataSampleLoad {
...                                                                                             
@ApiMethod(apiName = "loadQualitySample", version = "0.0.1")                                
void loadQualitySample(SampleResult sampleResult) throws PartitionResultAcceptException;
...                                                                                             
}                                                                                              

Интерфейс QualityDataSampleLoad. Реализуется на стороне Archiving, вызывается со стороны ПФ непосредственно. Фиксируется контракт, что в одном пакете SampleResult не может присутствовать идентификаторов, составляющих в совокупной длине более 1000 символов.

Типы данных и интерфейсы для получения результата запроса объектов (образца источника)#

Структуры для оценки размера образца:#

Транспортное сообщение запроса образца одного типа:

{**public** **enum** PartitionEstimateCode {
/**
* ПФ не умеет оценивать по списку ключей размер партиции
*/
PS_DEFAULT,
/**
* ПФ оценила размер портиции и подготовила данные для выгрузки
*/
PS_ADAPTIVE_READY,
/**
* ПФ подготавливает данные
*/
PS_ADAPTIVE_PENDING;
}
/**
* Результат оценки размера партиции
*/
public class EstimateResult implements Serializable {
private static final long serialVersionUID = -5906616942515441183L;
private final PartitionEstimateCode estimateCode;
private final Integer size;
}

Структуры для отдачи полных версий объекта (транспортный контейнер):#

Применяется та же структура что для первоначальной версии синхронного ТКД API ТКД (синхронный)

Транспортное сообщение запроса образца одного типа:

/**
* Результат загрузки партиции                                                        
*/                                                                                   
**public** **class** LoadResult **implements** Serializable {                         
**private** **static** **final** **long** serialVersionUID = 7608272960976480122L;    
**private** **final** ResultCode code;                                                
**private** **final** List<DataContainer> dataContainers = **new** ArrayList<>();
}                                                                                     
/**                                                                                 
* Контейнер данных                                                                   
*/                                                                                   
**public** **class** DataContainer **implements** Serializable {                      
**private** **static** **final** **long** serialVersionUID = 3239401317382698849L;    
**private** String key; //id объекта ПФ                                               
**private** String entryType; // тип объекта                                          
**private** Long version; // версия модели данных объекта                             
**private** OperationType operType; // тип операции (для ТКД - create)                 
**private** List<String> updAttrs; // список обновляемых аттрибутов                 
**private** **byte**[] data; //сериализованый объект ПФ                             
}                                                                                     
/**                                                                                 
* Тип операции в случае векторов изменений                                           
*/                                                                                   
**public** **enum** OperationType {                                                   
CREATE, UPDATE, DELETE;                                                               
}                                                                                     

Правила заполнения:

  1. В DataContainer#key вносится ключ по формату обмена данными с КАП и обмена внутренних данных. Формат ключей для объектов - без префикса типа (невербальный).

  2. В DataContainer#entryType вносится полный тип передаваемого объекта.

  3. В DataContainer#version вносится версия передаваемого объекта.

  4. В DataContainer#operType вносится Create (не изменяемо).

  5. В DataContainer#updAttrs вносится пустой список.

  6. В DataContainer#data вносятся сериализованные данные (согласно соглашений о сериализации, описанных выше, либо индивидуального контракта).

Объект QualityBatchResult - пакет для отправки в потоке результатов формирования пачки

Транспортное сообщение QualityBatchResult:

public class QualityBatchResult extends PartitionResult<byte[]> {
private static final long serialVersionUID = 6961184569997647971L;                                    
public QualityBatchResult(long responseId, String requestId, Timestamp responseTimestamp,                     
int partTotal, int partCurrent, String partHash, String entryType, String zoneId,                             
byte[] data, String hash) {                                                                                     
super(responseId, requestId, responseTimestamp, partTotal, partCurrent, partHash, entryType, zoneId, data, hash);
}                                                                                                                     
}                                                                                                                     

Отличается способ использования транспортного контейнера. Если в исходном API ТКД источник реализовывал на своей стороне интерфейс public interface QualityDataTransportApi, в текущем API применяется иной подход:

  1. Для получения образца источник реализует на своей стороне API public interface QualityDataSampleApi, а именно - его методы EstimateResult (оценки размера пачки) и getQualityBatch (собственно запрос данных).

  2. После построения контейнера LoadResult он сериализуется в массив байт с использованием стандартной Java сериализации.

  3. Поток байт разбивается на части размером по 900 кБайт (946176 байт), и каждый такой фрагмент упаковывается в объект QualityBatchResult. Далее все эти QualityBatchResult отправляются Archiving в потоке.

  4. Ответ в формате потока в виде объектов QualityBatchResult источник отправляет на API TCA  public interface QualityDataSampleLoad, а именно метод loadQualityBatch. Метод потоковый, т.е. ожидает получения следующей партиции после получения предыдущей, при условии что в составе партиции получен код состояния QualityBatchState.PROCESS.

Устанавливается таймаут, в течение которого Archiving ожидает как получение порции данных контейнера после вызова getQualityBatch, так и получение следующей партиции при ее наличии: по умолчанию таймаут составляет 10 минут.

Интерфейс и методы для отправки запроса к источнику по списку идентификаторов#

Интерфейс для получения данных по списку идентификаторов:

public interface QualityDataSampleApi {
...                                                                                                                   
@ApiMethod(apiName = "getQualityBatchSize", version = "0.0.1")                                                    
EstimateResult getQualityBatchSize(String type) throws BatchEstimateException;                                     
@ApiMethod(apiName = "getQualityBatch", version = "0.0.1")                                                        
void getQualityBatch(List<String> keys, String type, String requestId) throws QualityBatchCreationException;
...                                                                                                                   
}                                                                                                                      

Интерфейс QualityDataSampleApi реализуется источником.

В метод getQualityBatch в качестве значения ключа key вносится ключ по формату обмена данными с КАП и обмена внутренних данных. Формат ключей для объектов (раздел формат ключей) - без префикса типа (невербальный) с экранированными разделителями.

Интерфейс и методы  для получения потока идентификаторов по типу (образца)#

Интерфейс для получения потока полных версий объектов:

public interface QualityDataSampleLoad {
...                                                                                                        
@ApiMethod(apiName = "loadQualityBatch", version = "0.0.1")                                            
void loadQualityBatch(QualityBatchResult qualityBatchResult) throws PartitionResultAcceptException;
...                                                                                                        
}                                                                                                           

Интерфейс LoadQualityBatch. Реализуется на стороне Archiving, вызывается со стороны ПФ либо непосредственно с соблюдением контракта, либо - с использованием клиентской библиотеки Archiving, которая инкапсулирует формирование потока и всех необходимых хешей и идентификаторов.

Фиксируется следующий контракт:

  • В одном пакете QualityBatchResult содержится набор байт, представляющих фрагмент контейнера LoadResult, контейнер сериализован в поток с использованием Java сериализации.

  • Количество байт в одном сообщении не превышает 900 кБайт (946176 байт).

  • Фрагменты отправляются последовательно в порядке, соответствующем порядку следования фрагментов при их разбиении на стороне источника.

  • Поле responseId в контейнере заполняется монотонно возрастающими значениями, уникальными в пределах одного запроса requestId, порядок сортировки этих идентификаторов соответствует порядку частей контейнера для его последующего склеивания на стороне Archiving и десериализации.

  • Хеш partHash считается от всего объекта QualityBatchResult для каждой партиции.

  • Хеш hash считается от полного набора байт всего сериализованного контейнера LoadResult до его разбивки на части.

Библиотека Archiving для формирования объектов потока семпла идентификаторов и полных версий объектов#

Для удобства и унификации реализации клиентской части Archiving предоставляет библиотеку, инкапсулирующую формирование ответного потока данных в Archiving в части серилизации и разбиения данных:

  • Списка идентификаторов на части, в соответствии с контрактом.

  • Транспортного Контейнера LoadResult на части в соответствии с контрактом.

Классы данной библиотеки содержатся в модуле data-transport-api.

Библиотека предствляет несколько основных классов, каждый из которых будет подробно рассмотрен ниже:

  • InitDataSampleService - реализация интерфейса InitDataSampleApi, содержащая общую логику серилизации и отпарвки данных начальной загрузки.

  • QualityDataSampleService - реализация интерфейса QualityDataSampleApi, содержащая общую логику серилизации и отпарвки данных ТКД.

А так же несколько утилитарных классов серилизации JavaSerializationUtils, KryoSerializationUtils и поулчения хеша по алгоритму SHA-256 HashingUtils.

InitDataSampleService#

Класс InitDataSampleService реализует интерфейс InitDataSampleApi и содержит общую логику отправки начальных данных источника в Archiving. При отправке данных источника обеспечивается их серилизация и разбиение на партиции. Разбиение и отправка осуществляются асинхронно, пул потоков может быть передан через конструктор. Если использовать конструктор без пула потоков, то будет создан пул по умолчанию, который содержит InitDataSampleService*#DEFAULT_NUMBER_OF_THREADS* потоков. Кастомная логика функционирования сервиса реализуция источником данных в виде набора функций и консьюмера, которые передаются сервису в коструктор.

Полный констурктор сервиса:

/**                                                                                                       
* Констуктор.                                                                                              
*                                                                                                          
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                              
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,
* тк они интегрируются с 3-й версией).                                                                     
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                            
* @param executor Пулл потоков для асинхронного разбиения и отправки в Archiving.                               
* @param loadRequest Реализация интерфейса запросов.                                                      
* @param initLoad Функция обработки запросов инициализации начальной загрузки.                            
* @param batchCount Функция обработки запросов оценки объема начальной загрузки.                          
* @param loadBatchAsync Функция обработки запросов сэмпла данных начальной загрузки.                      
* @param abort Консьюмер обработки запросов сброса инициализации начальной загрузки.                      
*/                                                                                                         
public InitDataSampleService(String tsaModule,                                                          
String sourceZoneId,                                                                                        
ExecutorService executor,                                                                                   
InitDataSampleLoadRequest loadRequest,                                                                      
Function<String, String> initLoad,                                                                        
Function<String, EstimateResult> batchCount,                                                             
BiFunction<String, Integer, LoadBatchAsyncResultPair> loadBatchAsync,                         
Consumer<String> abort)                                                                                   
Функция обработки запросов инициализации начальной загрузки#

Обработку запроса инициализации начальной загрузки источник осуществляет через функцию обработки запросов инициализации начальной загрузки. На вход данная функуия принимает тип сущности, для которой инициализируется начальная загрузка. Функция возвращает идентификатор загрузки, который исползуется для запросов оценки объема начальной загрузки, запроса сэмпла данных началной загрузки или запроса сброса инициализации начальной загрузки.

Функция обработки запросов оценки объема начальной загрузки#

Обработку запроса оценки объема начальной загрузки источник осуществляет через функцию обработки запросов оценки объема начальной загрузки. На вход данная функция принимает идентификтор начальной загрузки, полученный при выполнении запроса инициализации начальной загрузки InitDataSampleApi#getBatchCount(String). Результат функции EstimateResult содержит количество партиций, на которые разбиты данные начальной загрузки.

Функция обработки запросов сэмпла данных начальной загрузки#

Обработку запроса сэмпла данных начальной загрузки источник осуществляет через функцию обработки запросов сэмпла данных начальной загрузки. На вход данная функция принимает идентификатор инициализации начальной загрузки, полученный при выполнении запроса инициализации начальной загрузки, а так же индекс требуемой партиции сэмпла данных. Результат работы функции LoadBatchAsyncResultPair содержит тип сущности, а также  LoadResult, содержащий список объектов, вохдящих в партицию сэмпла данных. Для серелизации объекта LoadResult используется утилитный класс JavaSerializationUtils. LoadResult, в свою очередь, будет разбит на партиции не превыщающие InitDataSampleService#DEFAULT_MAX_PARTITION_SIZE байт и передан через метод InitDataSampleLoadRequest#loadInitBatch(InitBatchResult).

Общий хеш ответа источника расчитывается по экземпляру LoadResult. Хеш партиции расчитывается от массива байт, содержащихся в ответе партиции.

Консьюмер обработки запросов сброса инициализации начальной загрузки#

Обработку запроса сброса инициализации начальной загрузки источник реализует через консьюмер обработки запросов сброса инициализации начальной загрузки. На вход данный консьюмер принимает идентификатор инициализации начальной загрузки, полученный при выполнении запроса инициализации начальной загрузки.

QualityDataSampleService#

Класс QualityDataSampleService реализует интерфейс QualityDataSampleApi и содержит общую логику серилизации, разбиения на партиции и отправки данных ТКД источника данных. Разбиение и отправка осуществляются асинхронно, пул потоков может быть передан через конструктор. Если использовать конструктор без пула потоков, то будет создан пул по умолчанию, который содержит QualityDataSamplkeService#DEFAULT_NUMBER_OF_THREADS потоков. Данные, предназначенные для отправки в Archiving, будут получены через ряд фунцкций, требуемых для создания экземпляра класса QualityDataSampleService.

Полный конструктор сервиса:

/**                                                                                                       
* Констуктор.                                                                                              
*                                                                                                          
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                              
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,
* тк они интегрируются с 3-й версией).                                                                     
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                            
* @param executor Пулл потоков для асинхронного разбиения и отправки в Archiving.                               
* @param sampleLoadRequest Реализация интерфейса запросов.                                                
* @param batchEstimate Функция обработки запросов оценки объема данных.                                   
* @param loadKeys Функция обработки запроса ключей.                                                       
* @param loadData Функция обработки запроса сэмпла объектов.                                              
*/                                                                                                         
public QualityDataSampleService(String tsaModule,                                                       
String sourceZoneId,                                                                                        
ExecutorService executor,                                                                                   
QualityDataSampleLoadRequest sampleLoadRequest,                                                             
Function<String, EstimateResult> batchEstimate,                                                           
BiFunction<String, Integer, Iterable<String>> loadKeys,                                                 
BiFunction<List<String>, String, LoadResult> loadData)                                                  
Функция обработки запросов оценки объема данных#

Обработку запроса оценки количества ключей источник реализует через функцию обработки запросов оценки объема данных. На вход данная функция принимает тип сущности, для которой необходимо проевести оценку объема данных на текущую дату. Функция возвращает экземпляр EstimateResult, который может содержать количество пачек доступных для загрузки объектов заданного типа на текущую дату.

Функция обработки запроса ключей#

Обработку запроса ключей источник реализует через функцию обработки запроса ключей. В качестве входных параметров данная функция принимиает тип запрашиваемых сущностей, а также количество запрашиваемых ключей. Ожидается, что функция вернет итератор, содержащий не больше ключей, чем было запрошено, актуальных на текущую дату. Ключи будут разбиты на партиции, содержащие не более QualityDataSampleService#DEFAULT_MAX_KEY_CHARS_COUNT_IN_PARTITION символов и переданы через метод QualityDataSampleLoadRequest#loadQualitySample(SampleResult).

Общий хеш ответа источника расчитывается по всем ключам итератора. Хеш партиции расчитывается от ArrayList<String>, содержащего ключи партиции.

Функция обработки запроса сэмпла объектов#

Обработку запроса сэмпла данных источник реализует через функцию обработки запроса сэмпла объектов. В качестве аргументов данная функция принимает список ключей запрашиваемых экземпляров сущности и тип сущности. Результат LoadResult содержит список запрашиваемых объектов, актуальных на текущую дату. Для серелизации объекта LoadResult используется утилитный класс JavaSerializationUtils. Результат будет разбит на партиции, не превыщающие QualityDataSampleService#DEFAULT_MAX_PARTITION_SIZE байт и передан через метод QualityDataSampleLoadRequest#loadQualityBatch(QualityBatchResult).

Общий хеш ответа источника расчитывается по экземпляру LoadResult. Хеш партиции расчитывается от массива байт, содержащихся в ответе партиции.

Общие замечания#

Общие параметры конструкторв сервисов:

tsaModule - идентификатор модуля Archiving, реализующего DataSampleLoadApi. Всегда pprbod-offline-dq-collector-v4 (за исключением источника ТКП - у них pprbod-offline-dq-collector, так как они интегрируются с 3-й версией). sourceZoneId - ММТ зона, в которой находится экземпляр данного сервиса.

Хеш код для соответвующих полей ответа источника расчитывается по алгоритму SHA-256.

Количество потоков, количество символов в партиции ответа на запрос ключей, а так же количество байт в партиции ответа на запрос сэмпла объектов могут быть изменены. Для этого необходимо унаследоваться от InitDataSampleService или QualityDataSampleService и переопределить соответствующие методы QulityDataSampleService*#numberOfThreads, QulityDataSampleService*#getMaxKeyCharsCountInPartition*, QulityDataSampleService*#getMaxPartitionSize*.

API 4 поколения#

Api 4 поколения - спецификация gRPC API для процесса оффлайн ТКД для использования в 4 поколении Платформы без использования с ММТ.

Спецификация построена на основе транспортных объектов и структур ММТ API третьего поколения и является его продолжением, функционально возможности API 4 и 3 поколения идентичны. API представлено спецификацией Java транспортных типов, идентичных третьему поколению и вспомогательной клиентской библиотекой, входящей в артефакт API, которая инкапсулирует внутри себя разбиение результирующего массива объектов с полными версиями источников на фрагменты, размер которых соответствует поддерживаемому Платформой транспортному контракту. Для построения транспортного объекта и его разбиения в 4 поколении и 3 поколении (ММТ) применяется единая библиотека, входящая в артефакт Archiving data-transport-api.

API ПФ представляет собой два потоковых gRPC - сервиса:

  • cервис получения образца идентификаторов по перечню типов;

  • cервис получения образца источника (полных версий объектов) по списку (образцу) идентификаторов и типу.

Сервис получения образца идентификаторов#

Сервис получения семпла идентификаторов предназначен для получения от ПФ перечня идентификаторов  по запрошенному типу  данных.  Сервис является потоковым - ответ на один запрос представляет собой от 1 до N ответов в потоке. Если источник располагается в разных зонах (шардах) - то сервис так же должен присутствовать в каждой из зон, так как Archiving выполняет запрос во все зоны ПФ, с последующим объединением идентификатора и сохранением информации о том, из какой зоны какие идентификаторы были фактически получены.

Сервис является синхронным: Archiving после отправки ответа ожидает получения ответа в потоке, вызов является блокирующим - до получения всего потока (либо истечения таймаута получения элемента потока) запрос считается выполняющимся, ответ в ФД не отдается.

Формат сообщения запроса образца идентификаторов по типу:

message PprbOffDqSampleRequestItem {                         
string entry_type = 1; // Тип даных                          
string global_type = 2; // Глобальный тип данных             
uint32 sample_threshold = 3; // Ограничение на размер семпла
uint64 request_id = 4; // ID запроса                         
Timestamp request_timestamp = 5; // Время запроса            
}                                                            

Идентификатор запроса - обязательный атрибут, по нему Archiving при обработке потока выполняет построение корреляции полученных ответов в потоке с исходным запросом. Параметр EntryType - тип, по которому запрошены данные. Параметр global_type - корневой (глобальный тип), может быть пустым, в таком случае не учитывается. Параметр sample_thershold определяет максимальное количество идентификаторов, которые должны войти в семпл (в штуках).

Формат элемента потока ответа

Потоковый ответ - образец идентификаторов:

message PprbOffDqSampleResponse {                                                                                
uint64 response_id = 1; // ID партиции ответа                                                                    
uint64 request_id = 2; // ID запроса с которым коррелирует ответ                                                 
Timestamp response_timestamp = 3; // Время формирования части ответа                                             
uint32 part_total = 4; // Количество партиций на которые разделен ответ (N)                                      
uint32 part_current = 5; // Номер текущей партиции (0..N-1)                                                      
string part_hash = 6; // Хеш текущей части семпла для проверки на стороне приемника                              
string entry_type = 7; // Тип объекта                                                                            
string zone_id = 8; // ММТ зона источника откуда приехали и дентификаторы                                        
repeated string sample_data = 9; // Данные ответа (сам список идентификаторов), одна строка - один идентификатор
enum SampleState {                                                                                           
FINAL = 0;                                                                                                       
PROCESS = 1;                                                                                                     
FAIL = 2;                                                                                                        
};                                                                                                               
SampleState sample_state = 10; // Статус потока (партиции)                                                       
string hash = 11; // Хеш всего образца для проверки на стороне приемника после получения всех партиций   
}                                                                                                                

Сообщение потокового ответа предполагает разбивку ответа на стороне ПФ при формировании ответа, в зависимости от длины полученного списка идентификаторов. Т.к. идентификатор всегда либо представляет собой строку, либо всегда приводится к ней - процедура разбиения длинного списка идентификаторов тривиальна и не требует какой - либо библиотечной инкапсуляции. В качестве контракта фиксируется, что длина одного фрагмента списка идентификаторов должна быть не более 1000 символов. Если в результате построения семпла на стороне источника список идентификаторов превышает эту величину - необходимо выполнять отправку этого списка в несколько партиций.

Отправляемый пакет всегда должен содержать идентификатор исходного запроса (того, в ответ на который сформирован пакет). Отправляемые пакеты должны нумероваться монотонно возрастающей последовательностью целых чисел, начиная с 0.

Если пакетов больше одного, то все пакеты, за исключением финального, должны иметь значение sample_state=PROCESS, последний (или единственный) пакет всегда принимает значение sample_state=FINAL. Если при формировании пакета на стороне ПФ возникает исключение, либо иная ситуация, в результате которой сформировать пакет невозможно - должен быть отправлен пакет со значением sample_state=FAIL - обработка потока на стороне Archiving будет остановлена, в КАП будет отправлена информация о том, что построить семпл не получилось

Для контроля целостности каждый пакет потока ответа сопровождается хешами:

  • hash - хеш полного семпла, вычисленный как значение хеша от строки -  всех идентификаторов, вошедших в выгрузку, сконкатенированных в единую строку через разделитель  точку с запятой, порядок конкатенации строго соответствует порядку следования идентификаторов в пакете.

  • part_hash  - хеш текущего пакета, вычисленный как значение хеша от всех идентификаторов, вошедших в текущий пакет, сконкатенированных в единую строку через разделитель  точку с запятой, порядок конкатенации строго соответствует порядку следования идентификаторов в пакете.

Внимание:

В случае отсутствия идентификаторов в зоне, в которую был получен запрос, ответ должен содержать в потоке единственный пакет, с нулевым количеством sample_data и значение хешей, посчитанное от пустой строки.

Потоковый сервис построения образца идентификаторов имеет следующую спецификацию

Транспортное сообщение запроса образца одного типа:

service PprbOffDqSampleService {                                                     
rpc ComputeSample (PprbOffDqSampleRequest) returns (stream PprbOffDqSampleResponse);
}                                                                                    

Сервис получения образца источника (полных версий объектов)#

Сервис предназначен для получения Archiving от ПФ полных версий объектов по заданному типу и запрошенному перечню идентификаторов.

Формат сообщения запроса размера образца источника:

message PprbOffDqEstimateBatchRequest {                                                                           
uint64 request_id = 1; // ID запроса списка объектов по списку идентификаторов                                    
string entry_type = 2; // Тип объекта                                                                             
repeated string sample_data = 3; // Список идентификаторов для оценки размера партиции готовых объектов по списку
}                                                                                                                 

Идентификатор запроса - обязательный атрибут, по нему Archiving при обработке потока выполняет построение корреляции полученных ответов в потоке с исходным запросом. Параметр EntryType -  тип, по которому запрошены данные. Параметр  global_type -  корневой (глобальный тип), может быть пустым, в таком случае не учитывается. Парамет sample_data определяет список идентификаторов, минимум должен быть один, максимум не фиксируется контрактом, но не более 1000.

Формат ответа на запрос размера образца:

message PprbOffDqEstimateBatchResult{                                                       
uint32 sample_size = 1; // Размер партиции в штуках объектов ПФ                             
enum PartitionEstimateCode{                                                             
PS_DEFAULT = 0; // ПФ не умеет оценивать по списку ключей размер партиции            
PS_ADAPTIVE_READY = 1; // ПФ оценила размер портиции и подготовила данные для выгрузки
PS_ADAPTIVE_PENDING = 2; // ПФ подготавливает данные;                               
};                                                                                         
PartitionEstimateCode estimate_result = 1; // Статус оценки                                
}                                                                                          

Формат запроса семпла источника:

message PprbOffDqLoadRequest {                                                                    
uint64 request_id = 1; // ID запроса списка объектов по списку идентификаторов                    
string entry_type = 2; // Тип объекта                                                             
repeated string sample_data = 3; // Список идентификаторов для запроса готовых объектов по списку
}                                                                                                 

Идентификатор запроса - обязательный атрибут, по нему Archiving при обработке потока выполняет построение корреляции полученных ответов в потоке с исходным запросом. Должен совпадать с идентификатором, по которому ранее был выполнен запрос размера семпла. Параметр EntryType -  тип, по которому запрошены данные. Параметр  global_type - корневой (глобальный тип), может быть пустым, в таком случае не учитывается.Параметр sample_data определяет список идентификаторов, минимум должен быть один, максимум не фиксируется контрактом, но не более 1000.

Формат элемента потока образца  источника:

message PprbOffDqLoadResult {                                                                          
uint64 response_id = 1; // ID партиции ответа                                                          
uint64 request_id = 2; // ID запроса списка объектов по списку идентификаторов                         
Timestamp response_timestamp = 3; // Время запроса                                                     
uint32 part_total = 4; // Количество партиций на которые разделен ответ (N)                            
uint32 part_current = 5; // Номер текущей партиции (0..N-1)                                            
string part_hash = 6; // Хеш текущей части семпла для проверки на стороне приемника                    
string entry_type = 7; // Тип объекта                                                                  
string zone_id = 8; // ММТ зона ичточника откуда приехали объекты                                      
bytes part = 9; // Данные ответа (CRYO- сериализованный объект LoadResult)                             
enum DataState { // Статус отправки                                                                
FINAL = 0; // Последний фрагмент, остановка потока                                                     
PROCESS = 1; // Есть еще фрагменты                                                                     
FAIL = 2; // Ошибка, остановка потока                                                                  
};                                                                                                     
DataState sample_state = 10; // Статус потока (партиции)                                               
string hash = 11; // Хеш всего образца для проверки на стороне приемника после получения всех партиций
}                                                                                                     

Сервис получения размера образца (синхронный, унарный) и собственно потока образца (потоковый)

Потоковый ответ, образец источника:

service PprbOffDqEstimateQualityBatch {                                                   
rpc ComputeSample (PprbOffDqEstimateBatchRequest) returns (PprbOffDqEstimateBatchResult);
}                                                                                         
// Загрузка партиции                                                                      
service PprbOffDqLoadQualityBatch {                                                       
rpc LoadQualityBatch (PprbOffDqLoadRequest) returns (stream PprbOffDqLoadResult);         
}                                                                                         

Артефакты API#

Примечание:

Описанное в данном документе API реализовано только в Archiving версии 4.x.x и выше, в Archiving 3 данный функционал отсутствует.

Maven артефакты DTO, ММТ API интерфейсов и вспомогательных библиотек#

<dependency>                             
<groupId>sbp.ts.pprbod<groupId>           
<artifactId>data-transport-api<artifactId>
<version>04.005.07<version>           
<dependency>                          

Протокол четвертого поколения#

Пример:

syntax="proto3";                                                                                                
import "google/protobuf/timestamp.proto";                                                                   
import "google/protobuf/struct.proto";                                                                      
package "com.sbt.pprbod.data.transport";                                                                    
// Сообщения и типы для обработки образцов источника                                                               
// Элемент запроса образца: запрос по одному типу с граничными условиями                                           
message PprbOffDqSampleRequest {                                                                                  
string entry_type = 1; // Тип даных                                                                               
string global_type = 2; // Глобальный тип данных                                                                  
uint32 sample_threshold = 3; // Ограничение на размер семпла                                                      
uint64 request_id = 4; // ID запроса                                                                              
Timestamp request_timestamp = 5; // Время запроса                                                                 
}                                                                                                                 
// Часть образца - сообщение содержащее один или несколько идентификаторов, составляющих образец       
message PprbOffDqSampleResponse {                                                                                 
uint64 response_id = 1; // ID партиции ответа                                                                     
uint64 request_id = 2; // ID запроса с которым коррелирует ответ                                                  
Timestamp response_timestamp = 3; // Время формирования части ответа                                              
uint32 part_total = 4; // Количество партиций на которые разделен ответ (N)                                       
uint32 part_current = 5; // Номер текущей партиции (0..N-1)                                                       
string part_hash = 6; // Хеш текущей части семпла для проверки на стороне приемника                               
string entry_type = 7; // Тип объекта                                                                             
string zone_id = 8; // ММТ зона ичточника откуда приехали и дентификаторы                                         
repeated string sample_data = 9; // Данные ответа (сам список идентификаторов)                                    
enum SampleState {                                                                                            
FINAL = 0;                                                                                                        
PROCESS = 1;                                                                                                      
FAIL = 2;                                                                                                         
};                                                                                                                
SampleState sample_state = 10; // Статус потока (партиции)                                                        
string hash = 11; // Хеш всего образца для проверки на стороне приемника после получения всех партиций            
}                                                                                                                 
// Сервисы источника                                                                                              
// Запрос образца по источнику - потоковый сервис                                                                  
service PprbOffDqSampleService {                                                                                  
rpc ComputeSample (PprbOffDqSampleRequest) returns (stream PprbOffDqSampleResponse);                              
}                                                                                                                 
// Запросы полной версии объекта по списку идентификаторов (с учетом партиционирования)                           
// Оценка размера пачки объектов по образцу                                                                        
message PprbOffDqEstimateBatchRequest {                                                                           
uint64 request_id = 1; // ID запроса списка объектов по списку идентификаторов                                    
string entry_type = 2; // Тип объекта                                                                             
repeated string sample_data = 3; // Список идентификаторов для оценки размера партиции готовых объектов по списку
}                                                                                                                 
message PprbOffDqEstimateBatchResult{                                                                             
uint32 sample_size = 1; // Размер партиции в штуках объектов ПФ                                                   
enum PartitionEstimateCode{                                                                                   
PS_DEFAULT = 0; // ПФ не умеет оценивать по списку ключей размер партиции                                    
PS_ADAPTIVE_READY = 1; // ПФ оценила размер портиции и подготовила данные для выгрузки                       
PS_ADAPTIVE_PENDING = 2; // ПФ подготавливает данные;                                                        
};                                                                                                                
PartitionEstimateCode estimate_result = 1; // Статус оценки                                                       
}                                                                                                                 
// Запрос партиции по списку объектов                                                                             
message PprbOffDqLoadRequest {                                                                                    
uint64 request_id = 1; // ID запроса списка объектов по списку идентификаторов                                    
string entry_type = 2; // Тип объекта                                                                             
repeated string sample_data = 3; // Список идентификаторов для запроса готовых объектов по списку                 
}                                                                                                                 
// Пакет сообщения (контейнер)                                                                                    
message PprbOffDqLoadResult {                                                                                     
uint64 response_id = 1; // ID партиции ответа                                                                     
uint64 request_id = 2; // ID запроса списка объектов по списку идентификаторов                                    
Timestamp response_timestamp = 3; // Время запроса                                                                
uint32 part_total = 4; // Количество партиций на которые разделен ответ (N)                                       
uint32 part_current = 5; // Номер текущей партиции (0..N-1)                                                       
string part_hash = 6; // Хеш текущей части семпла для проверки на стороне приемника                               
string entry_type = 7; // Тип объекта                                                                             
string zone_id = 8; // ММТ зона ичточника откуда приехали объекты                                                 
bytes part = 9; // Данные ответа (CRYO- сериализованный объект LoadResult)                                        
enum DataState { // Статус отправки                                                                           
FINAL = 0; // Последний фрагмент, остановка потока                                                                
PROCESS = 1; // Есть еще фрагменты                                                                                
FAIL = 2; // Ошибка, остановка потока                                                                             
};                                                                                                                
DataState sample_state = 10; // Статус потока (партиции)                                                          
string hash = 11; // Хеш всего образца для проверки на стороне приемника после получения всех партиций
}                                                                                                                 
// Сервисы                                                                                                        
// Оценка размера                                                                                                 
service PprbOffDqEstimateQualityBatch {                                                                           
rpc ComputeSample (PprbOffDqEstimateBatchRequest) returns (PprbOffDqEstimateBatchResult);                         
}                                                                                                                 
// Загрузка партиции                                                                                              
service PprbOffDqLoadQualityBatch {                                                                               
rpc LoadQualityBatch (PprbOffDqLoadRequest) returns (stream PprbOffDqLoadResult);                                 
}                                                                                                     

Транспортный контейнер#

Пример:

{                                                                                                                                       
"name": "PprbOffDqSampleDataContainer",                                                                                             
"namespace": "com.sbt.pprbod.avro.journal.classes",                                                                                 
"type": "record",                                                                                                            
"fields": [                                                                                                                  
{"name": "data_type", "type": "string" }, // "PprbData_1.0"                                                                  
{"name": "message_id", "type": "string"}, // Id сообщения, уникален для каждого контейнера                                    
{"name": "pprbod_client_id", "type": "string"}, //pprbod_client_id - id клиента-отправителя (уникален для каждого инстанса Archiving)
{"name": "message_timestamp", "type": "long"}, // время на момент формирования контейнера                                     
{"name": "global_type", "type": "string"}, // Тип объекта                                                                     
{"name": "zone_id", "type": "string"}, // ММТ зона ичточника откуда приехали и дентификаторы                                   
{"name": "PprbOffDqSampleResponse", "type":{ // Массив идентификаторов (собственно образец)                                       
"type": "array",                                                                                                              
"items": "string"                                                                                                               
}                                                                                                                       
},                                                                                                                           
{"name": "cont_hash", "type": "bytes"} // Хеш контейнера                                                                       
]                                                                                                                           
}                                                                                                                            

Сбор метрик К4 с использованием возможностей библиотеки data-transport-api#

Устройство библиотеки data-transport-api в части сбора метрик Init и ТКД#

Т.к. библиотека предполагает использование ее как в 3-м поколении платформы (WildFly) так и в 4-м (стек k8s или OpenShift (опционально)), в которых используется разный стек технологий для сбора метрик (custodian и micrometer+prometeus соответственно), в библиотеке был предусмотрен ряд абстрактных интерфейсов для сбора метрик:

public interface TsaStopwatch {               
void start();                                      
void stop();                                         
}                                                    
public interface TsaCounter {                     
void inc();                                     
}                                            
public interface TsaMonitoringService {              
TsaCounter counter(String name, TsaTags tsaTags);
TsaStopwatch stopwatch(String name, TsaTags tsaTags);
}                                               
public final class TsaTags {                         
private final TagsType tagsType;                         
private final Map<String, String> tags;                  
public TsaTags(TagsType tagsType, Map<String, String> tags) {
this.tagsType = tagsType;                                      
this.tags = tags;                                              
}                                                                 
}                                                                 

Данные интерфейсы имеют 3 набора реализаций. Точкой входа является реализация класса com.sbt.pprbod.data.monitoring.TsaMonitoringService:

  • com.sbt.pprbod.data.monitoring.custodian.CustodianMonitoringService - для сбора метрик в 3-м поколении на wildfly.

  • com.sbt.pprbod.data.monitoring.micrometer.MicrometerMonitoringService - для сбора метрик 4-м поколении на стеке k8s или OpenShift (опционально).

  • com.sbt.pprbod.data.monitoring.noop.NoOpMonitoringService - пустая реализация, не делающая ничего (обратная совместимость).

Настройка метрик на источнике, использующем протокол интеграции 4-3#

Для сбора метрик в 4-м поколении платформы используется библиотека io.micrometer. Чтобы библиотека могла выполнять сбор метрик K4, нужно в контекст spring-boot приложения добавить bean:

@Bean                                                                       
public TsaMonitoringService tsaMonitoringService(MeterRegistry meterRegistry) {
return new MicrometerMonitoringService(meterRegistry);                  
}                                                                 

Настройка сбора метрик на источники в 3м поколении платформы (WildFly)#

У классов com.sbt.pprbod.data.utils.InitDataSampleService и com.sbt.pprbod.data.utils.QualityDataSampleService были добавлены новые конструкторы:

/**                                                                   
* Конструктор.                                                                                 
*                                                                                                                        
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                                                  
* @param loadRequest Реализация интерфейса запросов.                                                                        
* @param initLoad Функция обработки запросов инициализации начальной загрузки.                                                
* @param batchCount Функция обработки запросов оценки объема начальной загрузки.                                               
* @param loadBatchAsync Функция обработки запросов сэмпла данных начальной загрузки.                                        
* @param abort Консьюмер обработки запросов сброса инициализации начальной загрузки.                                          
* @param monitoringService Интерфейс метрик.                                                                                
* @param sourceMnemonic мнемоника источника                                                                                   
*/                                                                                                                      
public InitDataSampleService(String sourceZoneId,                                                                    
InitDataSampleLoadRequest loadRequest,                                                                                 
Function<String, String> initLoad,                                                                                          
Function<String, EstimateResult> batchCount,                                                                               
BiFunction<String, Integer, LoadBatchAsyncResultPair> loadBatchAsync,                                                    
Consumer<String> abort,                                                                                             
TsaMonitoringService monitoringService,                                                                                  
String sourceMnemonic) {                                                                                              
this("pprbod-offline-dq-collector-v4", sourceZoneId, SplitAndSendUtils.createDefaultExecutor(DEFAULT_NUMBER_OF_THREADS, LOGGER),
loadRequest, initLoad, batchCount, loadBatchAsync, abort,                                                                   
monitoringService, sourceMnemonic);                                                                                   
}                                                                                                                  
/**                                                                                                           
* Конструктор.                                                                                               
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                                           
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,             
* т.к. они интегрируются с 3-й версией).                                                                         
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                                      
* @param loadRequest Реализация интерфейса запросов.                                                          
* @param initLoad Функция обработки запросов инициализации начальной загрузки.                                   
* @param batchCount Функция обработки запросов оценки объема начальной загрузки.                                 
* @param loadBatchAsync Функция обработки запросов сэмпла данных начальной загрузки.                           
* @param abort Консьюмер обработки запросов сброса инициализации начальной загрузки.                  
* @param monitoringService Интерфейс метрик.                                                                         
* @param sourceMnemonic Мнемоника источника                                                                     
*/                                                                                                                   
public InitDataSampleService(String tsaModule,                                                                
String sourceZoneId,                                                                                               
InitDataSampleLoadRequest loadRequest,                                                                           
Function<String, String> initLoad,                                                                                
Function<String, EstimateResult> batchCount,                                                                            
BiFunction<String, Integer, LoadBatchAsyncResultPair> loadBatchAsync,                                                   
Consumer<String> abort,                                                                                            
TsaMonitoringService monitoringService,                                                                                 
String sourceMnemonic) {                                                                                               
this(tsaModule, sourceZoneId, SplitAndSendUtils.createDefaultExecutor(DEFAULT_NUMBER_OF_THREADS, LOGGER),           
loadRequest, initLoad, batchCount, loadBatchAsync, abort,                                                           
monitoringService, sourceMnemonic);                                                                                 
}                                                                                                                
/**                                                                                                              
* Конструктор.                                                                                                 
*                                                                                                                 
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                                                         
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,                            
* т.к. они интегрируются с 3-й версией).                                                                                        
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                                         
* @param executor Пулл потоков для асинхронного разбиения и отправки в Archiving.                                            
* @param loadRequest Реализация интерфейса запросов.                                                                     
* @param initLoad Функция обработки запросов инициализации начальной загрузки.                                       
* @param batchCount Функция обработки запросов оценки объема начальной загрузки.                                    
* @param loadBatchAsync Функция обработки запросов сэмпла данных начальной загрузки.                                       
* @param abort Консьюмер обработки запросов сброса инициализации начальной загрузки.                              
* @param monitoringService Интерфейс метрик.                                                                   
* @param sourceMnemonic Мнемоника источника                                                                  
*/                                                                                                               
public InitDataSampleService(String tsaModule,                                                                         
String sourceZoneId,                                                                                          
ExecutorService executor,                                                                                                              
InitDataSampleLoadRequest loadRequest,                                                                                 
Function<String, String> initLoad,                                                                                    
Function<String, EstimateResult> batchCount,                                                                             
BiFunction<String, Integer, LoadBatchAsyncResultPair> loadBatchAsync,                                                    
Consumer<String> abort,                                                                                               
TsaMonitoringService monitoringService,                                                                    
String sourceMnemonic) {                                                                                            
this.executor = executor;                                                                                                
this.tsaModule = tsaModule;                                                                                              
this.sourceZoneId = sourceZoneId;                                                                                          
this.loadRequest = loadRequest;                                                                                            
this.initLoad = initLoad;                                                                                                  
this.batchCount = batchCount;                                                                                              
this.loadBatchAsync = loadBatchAsync;                                                                                      
this.abort = abort;                                                                                                       
this.monitoringService = monitoringService;                                                                                 
this.sourceMnemonic = sourceMnemonic;                                                                                     
}                                                                                                                 

И

/**                                                                                                          
* Конструктор.                                                                                                       
*                                                                                                                
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                                 
* @param sampleLoadRequest Реализация интерфейса запросов.                                               
* @param batchEstimate Функция обработки запросов оценки объема данных.                                      
* @param loadKeys Функция обработки запроса ключей.                                                          
* @param loadData Функция обработки запроса сэмпла объектов.                       
* @param monitoringService Интерфейс метрик.                                                                
* @param sourceMnemonic Мнемоника источника                                                              
*/                                                                                                              
public QualityDataSampleService(String sourceZoneId,                                                             
QualityDataSampleLoadRequest sampleLoadRequest,                                                     
Function<String, EstimateResult> batchEstimate,                                                      
BiFunction<String, Integer, Iterable<String>> loadKeys,                                              
BiFunction<List<String>, String, LoadResult> loadData,                                                      
TsaMonitoringService monitoringService,                                                                          
String sourceMnemonic) {                                                                                            
this("pprbod-offline-dq-collector-v4", sourceZoneId, SplitAndSendUtils.createDefaultExecutor(DEFAULT_NUMBER_OF_THREADS, LOGGER),
sampleLoadRequest, batchEstimate, loadKeys, loadData, monitoringService, sourceMnemonic);                          
}                                                                                                        
/**                                                                                                      
* Конструктор.                                                                                             
*                                                                                                                 
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                                          
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,       
* тк они интегрируются с 3-й версией).                                                                        
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.                                    
* @param sampleLoadRequest Реализация интерфейса запросов.                                                     
* @param batchEstimate Функция обработки запросов оценки объема данных.                                       
* @param loadKeys Функция обработки запроса ключей.                                                          
* @param loadData Функция обработки запроса сэмпла объектов.                                                   
* @param monitoringService Интерфейс метрик.                                                      
* @param sourceMnemonic Мнемоника источника                                                             
*/                                                                                                  
public QualityDataSampleService(String tsaModule,                                      
String sourceZoneId,                                                                       
QualityDataSampleLoadRequest sampleLoadRequest,                                                       
Function<String, EstimateResult> batchEstimate,                                                             
BiFunction<String, Integer, Iterable<String>> loadKeys,                                                                
BiFunction<List<String>, String, LoadResult> loadData,                                                        
TsaMonitoringService monitoringService,                                                                        
String sourceMnemonic) {                                                                                 
this(tsaModule, sourceZoneId, SplitAndSendUtils.createDefaultExecutor(DEFAULT_NUMBER_OF_THREADS, LOGGER),
sampleLoadRequest, batchEstimate, loadKeys, loadData, monitoringService, sourceMnemonic);        
}                                                                                    
/**                                                                                 
* Конструктор.                                                                                 
*                                                                                         
* @param tsaModule Идентификатор модуля Archiving, реализующего DataSampleLoadApi.                        
* Всегда pprbod-offline-dq-collector-v4(за исключением источника ТКП -- у них pprbod-offline-dq-collector,
* тк они интегрируются с 3-й версией).                                                       
* @param sourceZoneId ММТ зона, в которой находится экземпляр данного сервиса.       
* @param executor Пулл потоков для асинхронного разбиения и отправки в Archiving.                      
* @param sampleLoadRequest Реализация интерфейса запросов.                            
* @param batchEstimate Функция обработки запросов оценки объема данных.           
* @param loadKeys Функция обработки запроса ключей.                    
* @param loadData Функция обработки запроса сэмпла объектов.                        
* @param monitoringService Интерфейс метрик.                                                 
* @param sourceMnemonic Мнемоника источника                                                
*/                                                                                       
public QualityDataSampleService(String tsaModule,                
String sourceZoneId,                                    
ExecutorService executor,                                                                   
QualityDataSampleLoadRequest sampleLoadRequest,                  
Function<String, EstimateResult> batchEstimate,                        
BiFunction<List<String>, String, LoadResult> loadData,                               
TsaMonitoringService monitoringService,                                         
String sourceMnemonic) {                                                             
this.executor = executor;                                                    
this.tsaModule = tsaModule;                                                          
this.sourceZoneId = sourceZoneId;                                                   
this.sampleLoadRequest = sampleLoadRequest;                         
this.batchEstimate = batchEstimate;                                                 
this.loadKeys = loadKeys;                                                                                
this.loadData = loadData;                                                                                           
this.monitoringService = monitoringService;                                                  
this.sourceMnemonic = sourceMnemonic;  
}                                                                                                     

В качестве новых параметров monitoringService и sourceMnemonic нужно передавать настроенный bean com.sbt.pprbod.data.monitoring.custodian.CustodianMonitoringService и мнемонику источника соответственно.

Настройка bean com.sbt.pprbod.data.monitoring.custodian.CustodianMonitoringService заключается просто в его создании, описания метрик он загрузит из ресурсов библиотеки сам:

@Bean                                        
public TsaMonitoringService tsaMonitoringService() {
return new CustodianMonitoringService();    
}                                             

Примечание:

Если не использовать новые конструкторы, то в качестве monitoringService будет использован com.sbt.pprbod.data.monitoring.noop.NoOpMonitoringService, который не делает ничего.

Зависимость#

<dependency>                
<groupId>sbp.ts.pprbod<groupId>
<artifactId>data-transport-api<artifactId>
<version>04.005.01<version>  
<dependency>