Инициализирующая выгрузка и Технический контроль данных#

Точкой входа для функционала Инициализирующей выгрузки является InitDataSampleFunctions.

Точкой входа для функционала Технического контроля данных является QualityDataSampleFunctions.

SourceSystemDataProvider sourceSystemDataProvider = new HibernateDataProviderFactory()
            .setEntityManagerFactory(entityManagerFactory)
            .build();

QualityDataSampleFunctions qualityFunctions = new QualityDataSampleFunctions(sourceSystemDataProvider);
InitDataSampleFunctions initDataSampleFunctions = new InitDataSampleFunctions(sourceSystemDataProvider);

Если используется Spring, удобнее будет создать Bean для них.

Транспортная библиотека [ARCH] Platform V Archiving предоставляет Api для самостоятельной реализации функционала репликации в КАП:

QualityDataSampleService - конструктор, принимающий ссылки на функции с реализацией механизмов ТКД

InitDataSampleService - конструктор, принимающий ссылки на функции с реализацией механизмов Init.

Для интеграции с транспортной библиотекой [ARC] Platform V Archiving достаточно передать в соответствующие конструкторы ссылки на функции из соответствующих классов-фасадов.

Для удобства названия функций соответствуют названиям параметров в конструкторах транспортной библиотеки [ARC] Platform V Archiving.

Фактически это место в конфигурации будет являться точкой взаимодействия библиотеки datafabric-replication и транспортной библиотеки [ARC] Platform V Archiving.

Так же понадобится произвести настройки транспортной библиотеки и произвести подготовительные процедуры.

Актуальная информация по данным настройкам находится в документации [ARC] Platform V Archiving.

Настройки HibernateDataProviderFactory#

setEntityManagerFactory(EntityManagerFactory entityManagerFactory)

Передается EntityManagerFactory, используемый в проекте пользователя. Обязательный параметр.

Установка размера пакета данных#

setPartitionSize(int partitionSize)

Размер пакета данных влияет на скорость Init. Чем больше пакет данных, тем меньше издержек обработки. При этом, чем больше пакет данных, тем больше его размер и время на его обработку. Рекомендуется выставлять максимальный размер, проходящий по ограничению транспорта на размер сообщения и время обработки.

В соответствии с форматом CreateEvent ссылки между таблицами могут передаваться как коллекции идентификаторов. В связи с этим при выборе размера пакета данных нужно учитывать количество коллекций у сущности и их размер.

По умолчанию размер пакета данных составляет 3000 объектов.

Вынесите размер пакета данных в настройку для возможности изменять его без пересборки приложения пользователя.

Установка стратегии деления на пакеты данных#

setPartitioningStrategy(PartitioningStrategy partitioningStrategy)

Принимает экземпляр стратегии деления на пакеты данных.

Для простых стратегий доступна установка через метод, принимающий экземпляр перечисления PartitionStrategyType.

setPartitioningStrategy(PartitioningStrategyType strategy)

Параметр PartitioningStrategyType strategy должен быть вынесен в настройку, для возможности изменять его без пересборки приложения пользователя.

По умолчанию используется стратегия Autoselect.

Настройка индивидуальной стратегии деления на пакеты данных для определенной сущности#

Для разных сущностей оптимальными могут оказаться различные стратегии и размер пакета данных. Есть механизм для настройки индивидуальной стратегии для сущности. Технически он является еще одной стратегией поэтому устанавливается через метод

setPartitioningStrategy(PartitioningStrategy partitioningStrategy)

Экземпляр конфигуратора индивидуальной стратегии создается методом HibernateDataProviderFactory#partitioningStrategyFactory Для задания стратегии указывается полное имя сущности либо список полных имен сущностей, стратегия и размер пакета данных. Так же есть перегрузки, принимающие как перечисляемый тип PartitionStrategyType, так и принимающие экземпляр стратегии. Параметр, задаваемый через setPartitionSize, не используется в случае использования индивидуальной стратегии.

addStrategyForType(String typeName, PartitioningStrategy strategy)
addStrategyForTypes(List<String> typeNames, PartitioningStrategy strategy)
addStrategyForType(String typeName, PartitioningStrategyType strategyType, int partitionSize)
addStrategyForTypes(List<String> typeNames, PartitioningStrategyType strategyType, int partitionSize)

Для сущностей, для которых не указана стратегия, будет использована стратегия по умолчанию, задаваемая соответствующими методами

setDefaultStrategy(PartitioningStrategyType strategy, int partitionSize)
setDefaultStrategy(PartitioningStrategy strategy, int partitionSize)

Комплексный пример задания индивидуальной стратегии:

HibernateDataProviderFactory factory = new HibernateDataProviderFactory();
PartitioningStrategyFactory sf = factory.partitioningStrategyFactory();
HibernateDataProvider sourceSystemDataProvider = factory.setEntityManagerFactory(entityManagerFactory)
    .setPartitioningStrategy(
        sf.getIndividualStrategyBuilder()
            .setDefaultStrategy(PartitioningStrategyType.HASHCODE, 3000)
            .addStrategyForTypes(
                ImmutableList.of("com.some.Entity1", "com.some.Entity2", "com.some.Entity3"),
                PartitioningStrategyType.HASHCODE, 45000
            )
            .addStrategyForType("com.some.Entity4", PartitioningStrategyType.SIMPLE_ID, 5000)
            .addStrategyForType("com.some.Entity5", PartitioningStrategyType.SIMPLE_ID, 3000)
            .build())
    .build();

По возможности рекомендуется выносить настройки стратегии в конфигурацию, для возможности изменения без пересборки приложения.

Параметр для настройки конкретных стратегий#

setInsertBatchSize(int insertBatchSize) - тонкая настройка для стратегии CompositeId. Количество пакетов данных, вставляемых во вспомогательную таблицу за один раз. По умолчанию - 50.

setInitialFetchSize(int initialFetchSize) - тонкая настройка для стратегии CompositeId. Размер окна при чтении таблицы сущности, по которой запущен Init. По умолчанию 20000.

setPartitioningFieldName(String partitioningFieldName) - настройка для стратегий LongField и BigDecimalField.

Использование для Init и ТКД резервной БД#

useStandinDataSource()

По умолчанию выключено

Белый список#

При чтении коллекций происходит много запросов в БД, что приводит к увеличению времени сбора пакета данных. [ARC] Platform V Archiving может отсеивать по белому списку поля, не подлежащие передаче в КАП. Есть возможность подключить белый список для отсеивания полей в момент захвата данных.

setWhitelistPath(String whitelistPath)

либо

setWhiteListFile(File whiteListFile)

Главным образом функционал белого списка реализован на стороне [ARC] Platform V Archiving. Фильтрация данных при захвате данных реализована только для оптимизации.

Не допускается указывать в данной настройке белый список, отличающийся от того, что был передан в [ARC] Platform V Archiving.

Информация о структуре белого списка и способах его получения находится в документации [ARC] Platform V Archiving

Ограниченный Init и ТКД (Лимиты)#

В условиях тестов может понадобиться производить Init по части данных. Для этого есть 2 механизма, описанных ниже:

Лимит по количеству#

setLimit(int limit)

Особенности:

  • Ограничивает захват данных по количеству.

  • Не оказывает большого влияния на алгоритм Init, поэтому тест Init является показательным.

  • Не участвует при ТКД.

Лимит по значению#

addLimitValue(Class<?> type, String field, Comparable limitValue)

Особенности:

  • Ограничивает данные по значению заданного поля. (Условие фильтрации больше-или-равно).

  • Задается индивидуально для каждой сущности.

  • Оказывает большое влияние на алгоритм Init, поэтому тест Init не является показательным.

  • Участвует при ТКД.

Поле может быть следующих типов:

  • Long/long

  • Integer/int

  • BigDecimal

  • Date

  • String

Переопределение стратегии версионирования#

setVersioningStrategy(DataFabricVersioningStrategy versioningStrategy)

setVersioningStrategy(VersioningStrategy ormToolsVersioningStrategy)

В большинстве случаев переопределение стратегии версионирования не требуется. Переопределение может использоваться в случаях, когда по каким либо причинам нужно изменять версию объекта при чтении из БД. При этом за контроль корректности работы версионирования отвечает приложение пользователя.

Ускорение загрузки коллекций#

Примечание

В рамках данного раздела приняты обозначения:

  • N — размер пакета данных.

  • K — количество полей-коллекций в классе, участвующем в инициализирующей выгрузке.

По умолчанию загрузка коллекций осуществляется штатными механизмами Hibernate и Persistence. «Ленивая» (LAZY) конфигурация загрузки является оптимальной для бизнес-логики приложения пользователя, но при инициализирующей выгрузке приводит к большому количеству SQL запросов и дозагрузки коллекций (известна под названием «проблема N+1»).

Пример:

У класса, участвующего в инициализирующей выгрузке, есть K полей-коллекций. Размер пакета данных сконфигурирован как N.

Тогда для чтения одного пакета данных будет произведено 1 + N * K запросов в СУБД.

При использовании LAZY конфигурации загрузки при инициализирующей выгрузке рекомендуется использовать механизм ускорения, позволяющий загружать значения одного поля-коллекции за один запрос. Тогда, для чтения одного пакета данных, будет произведено 1 + K запросов в СУБД.

Для включения механизма используйте параметр initCollectionBatchSize:

hibernateDataProviderFactory.setCollectionFetcher(new SafeFetcherImpl(new BatchFetcherImpl(initCollectionBatchSize)))

initCollectionBatchSize — параметр, определяющий количество объектов для которых будет загружена коллекция за 1 раз (поведение аналогично параметру size аннотации @BatcheSize из Hibernate ORM). Параметр должен быть равен размеру пакета данных (setPartitionSize), исключение — если установленное значение приводит к ошибке ограничения длины запроса, в этом случае установите значение меньше чем размер пакета данных.

Ограничения механизма:

  • Не работает с сущностями с составными первичными ключами (ограничение как на сущность-владельца поля коллекции, так и на сущность-элемент коллекции).

  • При связывании коллекций по полю, не являющемся первичным ключом, не дает ожидаемого роста производительности (в реализации по умолчанию на такую коллекцию произойдет 1 + 2 * N запросов, при использовании описанного механизма — 1 + 1 + N запросов).

В случае, если прочитать коллекцию с помощью механизма ускорения не получилось, коллекция будет загружена штатными средствами Hibernate ORM. За fallback-функциональность отвечает обертка SafeFetcherImpl. При необходимости изменить данное поведение создайте собственную реализацию на основе интерфейса Fetcher (например фильтровать поля, которые всегда обрабатываются в fallback-режиме).

Для загрузки с помощью описанного механизма коллекций, сконфигурированных для «жадной» (EAGER) загрузки, установите дополнительный параметр new OverrideEagerHintFiller():

hibernateDataProviderFactory.setHintFillers(Collections.singletonList(new OverrideEagerHintFiller()))

Диагностика ошибок Init/ТКД#

Завершение процесса с ошибкой#

Поиск ошибок Platform V Persistence в логе#

Все ошибки Init и ТКД, возникающие при работе библиотеки, содержат в себе упоминание com.sbt.pprb.integration.datafabric. Таким образом для поиска ошибок библиотеки можно искать вхождения «datafabric» в общем логе.

В графическом интерфейсе выводится текст ошибки, но он не всегда содержит всю полезную информацию, так как нижняя часть сообщения об ошибке обрезается из-за ограничения на количество символов. Полную ошибку можно увидеть в логе сервера Platform V Archiving (ARC).

Ошибки в данных#

Ошибки могут возникать из-за некорректных данных в БД источника. Это может быть несоответствие данных в БД и типа поля в классе сущности. Например, в БД может быть строка, которая не соответствует по формату UUID, Enum или числовому полю. Также могут встречаться «битые» ссылки. Такие ошибки не являются дефектом библиотеки. Они будут возникать не только при Init/ТКД, но и при попытке обращения к проблемным данным штатными средствами Hibernate ORM. Такие проблемы возможно диагностировать силами источника без привлечения технической поддержки.

Пример такой ошибки:

...
Caused by: com.sbt.pprbod.exchange.exception.RemoteEndpointCallException: com.sbt.pprbod.data.transport.exception.BatchLoadException: java.lang.RuntimeException: Error at index 2 in: "caxe775aa31c"
java.lang.NumberFormatException: Error at index 2 in: "caxe775aa31c"   <-- Причина ошибки: невозможность преобразовать строку "caxe775aa31c" к UUID
at java.base/java.lang.NumberFormatException.forCharSequence(NumberFormatException.java:81)
at java.base/java.lang.Long.parseLong(Long.java:775)
at java.base/java.util.UUID.fromString(UUID.java:225)
at org.hibernate.type.descriptor.java.UUIDTypeDescriptor$ToStringTransformer.parse(UUIDTypeDescriptor.java:93)
at org.hibernate.type.descriptor.java.UUIDTypeDescriptor.wrap(UUIDTypeDescriptor.java:60)
at org.hibernate.type.descriptor.java.UUIDTypeDescriptor.wrap(UUIDTypeDescriptor.java:20)
at org.hibernate.type.descriptor.sql.VarcharTypeDescriptor$2.doExtract(VarcharTypeDescriptor.java:62)
at org.hibernate.type.descriptor.sql.BasicExtractor.extract(BasicExtractor.java:47)
at org.hibernate.type.AbstractStandardBasicType.nullSafeGet(AbstractStandardBasicType.java:257)
at org.hibernate.type.AbstractStandardBasicType.nullSafeGet(AbstractStandardBasicType.java:253)
at org.hibernate.type.AbstractStandardBasicType.nullSafeGet(AbstractStandardBasicType.java:243)
at org.hibernate.type.AbstractStandardBasicType.hydrate(AbstractStandardBasicType.java:329)
at org.hibernate.loader.Loader.extractKeysFromResultSet(Loader.java:808)
at org.hibernate.loader.Loader.getRowFromResultSet(Loader.java:732)
at org.hibernate.loader.Loader.processResultSet(Loader.java:1008)
at org.hibernate.loader.Loader.doQuery(Loader.java:964)
at org.hibernate.loader.Loader.doQueryAndInitializeNonLazyCollections(Loader.java:354)
at org.hibernate.loader.Loader.doList(Loader.java:2815)
at org.hibernate.loader.Loader.doList(Loader.java:2797)
at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2629)
at org.hibernate.loader.Loader.list(Loader.java:2624)
at org.hibernate.loader.hql.QueryLoader.list(QueryLoader.java:506)
at org.hibernate.hql.internal.ast.QueryTranslatorImpl.list(QueryTranslatorImpl.java:396)
at org.hibernate.engine.query.spi.HQLQueryPlan.performList(HQLQueryPlan.java:219)
at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1396)
at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1558)
at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1526)
at org.hibernate.query.Query.getResultList(Query.java:165)
at org.hibernate.query.criteria.internal.compile.CriteriaQueryTypeQueryAdapter.getResultList(CriteriaQueryTypeQueryAdapter.java:76) <-- Вызов штатного механизма поиска Hibernate ORM
at com.sbt.pprb.integration.datafabric.hibernate.partitioning.HashCodePartitioningStrategy.readPartition(HashCodePartitioningStrategy.java:179)   <-- Вхождение datafabric найдено
at com.sbt.pprb.integration.datafabric.hibernate.HibernateDataProvider.readPartition(HibernateDataProvider.java:278)
at com.sbt.pprb.integration.datafabric.InitDataSampleFunctions.loadBatchAsync(InitDataSampleFunctions.java:61)
at com.sbt.pprbod.data.utils.SplitAndSendUtils.lambda$splitAndSendInitBatchBatch0$13(SplitAndSendUtils.java:351)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
...

Проблема доступа к вспомогательной таблице#

Для работы инициализирующей выгрузки требуется наличие вспомогательной таблицы. Таблица должна быть создана в БД источника, у пользователя приложения должны быть права на insert, update и delete из данной таблицы. Таблица должна быть доступна из searchpath БД (проверить можно выполнив запрос select * from init_data_partitions, без указания схемы).

При недоступности вспомогательной таблицы возникают ошибки вида:

...
Caused by: org.postgresql.util.PSQLException: ERROR: relation "init_data_partitions" does not exist
Position: 13
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:166)
at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:134)
at org.jboss.jca.adapters.jdbc.WrappedPreparedStatement.executeUpdate(WrappedPreparedStatement.java:537)
at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:197)
... 28 more

Варианты решения ошибки:

  • Отсутствует вспомогательная таблица для Init. Необходимо создать.

  • Таблица недоступна в searchpath (например создана в другой схеме). Можно решить, указав параметр jdbc-cсоединения - ?currentSchema=<имя схемы> (для PostgreSQL).

Ошибка Init not started#

Ошибка означает отсутствие информации о распределении данных по пакетам во вспомогательной таблице.

Возможные причины:

  1. Запуск нескольких Init по одному и тому же типу одновременно. При старте инициируется процесс подготовки к Init. В рамках него происходит распределение данных по нумерованным пакетам. Распределение закрепляется за конкретным типом, ему присваивается уникальный идентификатор - loadingId. При старте новой сессии Init происходит очистка распределения предыдущей сессии по этому же типу.

    Решение: не запускать Init для одного и того же типа одновременно.

  2. Наличие приложения, подписанного на те же события Kafka, но использующего другую БД. Может возникать в следствие некорректной настройки приложений на разных стендах. Таким образом запрос подготовки от Platform V Archiving (ARC) поступает на некорректный узел приложения и распределение по пакетам происходит в некорректной БД.

    Решение: перенастроить или выключить узлы «похитители» запросов.

Некорректная конфигурация запуска Init#

В конфигурации может быть допущена ошибка в имени класса. В этом случае возникает ошибка следующего вида:

Caused by: java.lang.RuntimeException: org.hibernate.MappingException: Unknown entity: com.my.package.MyClass
    at org.hibernate.metamodel.internal.MetamodelImpl.entityPersister(MetamodelImpl.java:704)
    at com.sbt.pprb.integration.datafabric.hibernate.partitioning.AutoSelectPartitioningStrategy.isComponentType(AutoSelectPartitioningStrategy.java:86)
    at com.sbt.pprb.integration.datafabric.hibernate.partitioning.AutoSelectPartitioningStrategy.init(AutoSelectPartitioningStrategy.java:45)
    at com.sbt.pprb.integration.datafabric.hibernate.partitioning.manual.ManualSelectPartitioningStrategy.init(ManualSelectPartitioningStrategy.java:53)
    at com.sbt.pprb.integration.datafabric.hibernate.HibernateDataProvider.init(HibernateDataProvider.java:82)
    at com.sbt.pprb.integration.datafabric.InitDataSampleFunctions.initLoad(InitDataSampleFunctions.java:38)
    at com.sbt.pprbod.data.utils.InitDataSampleService.initLoad(InitDataSampleService.java:234)
    at sun.reflect.GeneratedMethodAccessor2472.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.sbt.core.transport.commons.utils.CBLUtils.changeClassLoaderAndInvoke(CBLUtils.java:21)
    at com.sbt.core.transport.server.TransportServiceExporter.invoke(TransportServiceExporter.java:431)
    at com.sbt.core.transport.server.TransportServiceExporter.invokeApiMethod(TransportServiceExporter.java:215)
    at com.sbt.core.transport.server.TransportServiceExporter.onMessage(TransportServiceExporter.java:155)
    at com.sbt.core.transport.rpc.impl.RpcExecutorImpl.handleMessageWithSourceMessageChannel(RpcExecutorImpl.java:471)
    at com.sbt.core.transport.rpc.impl.RpcExecutorImpl.handleWithTrace(RpcExecutorImpl.java:423)
    at com.sbt.core.transport.rpc.impl.RpcExecutorImpl.handleMessage(RpcExecutorImpl.java:391)
    at com.sbt.core.transport.rpc.impl.RpcExecutorImpl.handle(RpcExecutorImpl.java:285)
    at sun.reflect.GeneratedMethodAccessor1076.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.google.common.eventbus.Subscriber.invokeSubscriberMethod(Subscriber.java:87)
    at com.google.common.eventbus.Subscriber$1.run(Subscriber.java:72)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Так же такая ошибка может возникать при попытке провести Init по абстрактному или embeddable классу.

Эти классы не являются самостоятельными сущностями и Init проводить по ним не допускается.

Неподдерживаемый тип в составе первичного ключа#

Для преобразования строкового представления первичного ключа в бинарный задано ограниченное число конвертеров. Если для нужного типа конвертер отсутствует, Init завершится с ошибкой:

Unexpected type: class java.time.LocalDateTime Expected one of: [int, class java.util.Date, class java.time.LocalDate, class java.lang.String, class java.sql.Timestamp, long, class java.lang.Long, class java.lang.Integer, class java.util.UUID] or enum type

Для добавления конвертера для своего типа:

  1. Создайте в своем коде реализацию интерфейса com.sbt.pprb.integration.datafabric.hibernate.converter.Converter.

  2. Добавьте новый конвертер Converters.getConverters().put(Класс_неподдерживаемого_типа.class, new Новый_экземпляр_конвертера()).

Внимание!

Важно добавить новый конвертер на этапе старта приложения, до фактического запуска Init!

Например, вызов можно поместить в метод настройки конфигурации Init HibernateDataProvider.

Ошибка таймаута#

Ошибка таймаута может возникать по нескольким причинам:

  • ошибки при чтении из БД на стороне источника. Из-за асинхронного API такие ошибки не всегда могут передаться как результат обработки запроса в Platform V Archiving (ARC);

  • инфраструктурные ошибки на транспортной Kafka;

  • долгая обработка запроса чтения из БД. В этом случае нужно увеличивать таймауты в UI компонента «Фоновые процессы» (BGPX): Таймаут обработчика и partitionTreshold.

Ошибка в компоненте «Фоновые процессы» (BGPX) при этом выглядит следующим образом:

Необработанное исключение BGPApplicationException
com.sbt.bgp.exceptions.BGPApplicationException: Задача закончила выполнение с ошибкой в шаге process
      at com.sbt.bgp.task.ProcessExecutor.execute(ProcessExecutor.java:45)
      at com.sbt.bgp.task.interaction.thread.ThreadWorker.call(ThreadWorker.java:28)
      at com.sbt.bgp.computations.api.impl.BGPWorkerApiImpl.lambda$createThreadRunnable$0(BGPWorkerApiImpl.java:60)
      at com.sbt.bgp.access.BGPRunnable.run(BGPRunnable.java:37)
      at com.sbt.access_system.client.platform.service.TaskSecurityServiceImpl$SecureTask.run(TaskSecurityServiceImpl.java:103)
      at com.sbt.bgp.access.BGPSecureTaskExecutor.lambda$execute$0(BGPSecureTaskExecutor.java:71)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Processing partition error! partition=[Batch{loadingId='5c1ae69e-96e6-40fb-90fa-fbdf32b25c1c', index=0, typeName='com.my.package.MyClass'}], source=[AE_SPARK]
      at com.sbt.pprbod.data.AbstractInitApplicationTask.process(AbstractInitApplicationTask.java:310)
      at com.sbt.bgp.task.ProcessExecutor.execute(ProcessExecutor.java:36)
      ... 10 more
Caused by: com.sbt.pprbod.data.transport.exception.BatchLoadException: An error occurred during call 'loadBatchAsync(5c1ae69e-96e6-40fb-90fa-fbdf32b25c1c,0,77ecf8ae-e45f-43f3-abb2-ae20413d2a1b)' and result waiting
      at com.sbt.pprbod.data.sample.bgp.InitApplicationTask.loadBatch(InitApplicationTask.java:70)
      at com.sbt.pprbod.data.AbstractInitApplicationTask.process(Abst

После обнаружения такой ошибки, нужно смотреть ошибки в логе источника. Отсутствие ошибки в логе источника означает инфраструктурную ошибку, либо долгий запрос. Исключить проблему долгого запроса можно, проанализировав лог на наличие info-записей по категории com.sbt.pprb.integration.datafabric.InitDataSampleFunctions.

loadBatchAsync request with loadingId=<loadingId>, index=<Индекс>  <-- факт начала обработки пакета данных.
...
Succeed loadBatchAsync request with loadingId=<loadingId>, index=<Индекс>. Return <количество объектов в пакете данных> data containers <-- факт успешного завершения обработки пакета данных.

Важно выбирать записи с loadingId и index из сообщения об ошибке. По этим записям можно вычислить время обработки запроса, и если оно больше установленного таймаута, это означает наличие долгого запроса. Если же вычисленное значение меньше таймаута — причина ошибки инфраструктурная. Отсутствие в логе записей такого вида означает так же инфраструктурную ошибку (не дошел запрос на источник), либо не настроенное для этой категории логирование.

Важно не путать таймаут ответа источника с другими ошибками связанными с таймаутом. Например, может возникать ошибка Слейв(обработчик) не отвечает 1200 секунд, попытки перезапуска исчерпаны. Ошибки такого вида означают проблему в работе сервера Platform V Archiving (ARC).

Медленный Init#

Соотношение размера пакета данных с быстродействием#

Рекомендуется выносить параметры HibernateDataproviderFactory.setPartitionStrategy и HibernateDataproviderFactory.setPartitionSize во внешний конфигурационный файл, для возможности изменять их без пересборки приложения. Размер пакета данных подбирается в результате тестов, так как вычислить его аналитически очень сложно.

На фактический размер пакета (в байтах) влияет:

  • количество и типы полей сущности;

  • количество коллекций в сущности и максимальный размер коллекции (коллекции передаются в виде списка первичных ключей);

  • размер пакета данных, выставленный в настройках;

  • неравномерность распределения размеров коллекций по объектам.

Общая логика следующая: увеличение размера пакета данных приводит к увеличению времени обработки одного пакета, но при этом сокращается общее количество пакетов данных, и, как следствие, сокращаются издержки выборки данных из БД. То есть увеличение размера приводит к большему быстродействию. Ограничивающим фактором является размер сообщения на Kafka (в байтах).

Рекомендуется на стартовом этапе устанавливать размер пакета данных равным 3000 объектов и по результатам тестов увеличивать или уменьшать его. Если полей в сущностях мало и модель не содержит коллекций, это значение может быть увеличено многократно (пример реально работающей конфигурации - 45000).

Известные проблемы связанные с размером пакета данных:

  • Стратегия HASHCODE (по умолчанию) использует запросы БД, которые приводят к fullscan чтению таблицы и хешированию для каждой строки. При большом количестве пакетов данных (может быть вызвано как неоптимальным размером, так и большим количеством строк в БД) появляется большая утилизация ресурсов БД, что приводит к замедлению Init или даже его завершению с ошибкой.

  • Стратегия COMPOSITE_ID (в том числе при COMPOSITE_OR_SIMPLE_ID_AUTOSELECT) генерирует запрос с длинным условием. Из-за особенностей построения критериев Hibernate ORM построение слишком длинного условия приводит к ошибкам StackOverflow. Причина не в зацикливании, а в рекурсивности алгоритма работы с критериями в Hibernate ORM.

Расхождение данных в результате ТКД#

Расхождение может возникать из-за инфраструктурных проблем — потеря сообщений при потоковой репликации, либо большая задержка в Kafka. Процесс ТКД предназначен для обнаружения таких проблем. Помимо этого, расхождения появляются при нарушении принципа «прочитанное равно записанному».

Известные проблемные типы полей:

  • Типы дата-время с timezone. При вставке может указываться произвольная timezone, а при чтении БД переводит timezone к текущей.

  • Jsonb в PostgreSQL. При вставке мог быть неформатированный JSON, после чтения JSON отформатирован. Поскольку JSON обрабатывается в КАП как строка, изменение форматирования считается несоответствием данных.

  • Коллекции и связи. КАП не работает с реляционной логикой, вследствие чего при записи данных путем изменения ссылки на родителя не будет порождено событий изменения коллекции, а только событие изменения поля ссылки на родителя.

В большинстве случаев информация о самих коллекциях не нужна в КАП, поэтому распространенное решение данной проблемы — исключение полей-коллекций из белого списка (обратная ссылка в сущности-элементе остается).

Не исключены другие возможные несоответствия. Дефектом Platform V Persistence не является. Источник самостоятельно предпринимает действия для обеспечения принципа «прочитанное равно записанному».

Также расхождения могут появляться при изменении данных в обход событийного механизма Hibernate ORM (native-query, hql, jpql, criteria-update, criteria-delete, ручные запросы к БД). Является ограничением продукта Platform V Persistence (HBR), рекомендуется отказаться от таких изменений, либо, если это невозможно, использовать механизм для ручного формирования векторов (по согласованию).