Прикладная репликация#
Принцип работы#
Плагин интеграции с Прикладным журналом предоставляет функционал, позволяющий реализовать распространение сообщений об изменениях данных посредством интеграции с Прикладным Журналом (APLJ) и прикладную репликацию путем воспроизведения операций с API EntityManager на резервной БД. Основные функции плагина:
Сбор изменений, произведенных в транзакции, и формирование вектора изменений.
Сериализация и десериализация вектора изменений.
Формирование служебных сообщений о блокировках.
Вызов API клиентской библиотеки Прикладного Журнала (APLJ) для отправки сообщений.
Обработка сообщений из Прикладного Журнала (APLJ) и воспроизведение изменений на резервной БД.
Установка и снятие прикладных блокировок.
Переключение активного источника данных по команде из Прикладного Журнала (APLJ).
Плагин подписывается на события, генерируемые библиотекой Hibernate ORM. В момент сброса изменений в БД плагин формирует вектор изменений - структуру данных, содержащую в себе сведения об измененных сущностях. Перед фиксацией транзакции вектор изменений сериализуется в транспортный формат, определяет ключ партиционирования и синхронно вызывает API клиентской библиотеки Прикладного Журнала (APLJ) для отправки сообщения. В зависимости от настроек алгоритмов также могут быть сформированы и отправлены служебные сообщения о блокировках.
Пользователь может сконфигурировать Bean подписки на события Прикладного Журнала (APLJ). В этом случае приложение будет читать сообщения об изменениях данных из Прикладного Журнала (APLJ) и воспроизводить их на резервной БД.
При указании в настройках источника данных резервной БД плагин настроит фабрику сессий таким образом, чтобы она работала с активным источником данных. При этом активный источник данных будет выбираться по команде из Прикладного Журнала (APLJ). В случае переключения в StandIn плагин получит сообщение о переключении и автоматически заменит основной источник данных на резервный.
Взаимодействие с транспортным слоем происходит посредством вызовов API клиентской библиотеки Прикладного журнала (APLJ). Пользователь должен самостоятельно реализовать подключение к Прикладному журналу (APLJ) и передать API с помощью настройки setJournalClient.
Требования и ограничения#
Наличие поля @Version. Каждая сущность прикладной модели данных должна содержать в себе целочисленное поле, помеченное аннотацией
@Version. Предпочтительный тип поля - long. см. Контроль версионирования и порядок применения при репликации.Запрет на использование cиквенсов и автоинкрементных полей. В связи с возможностью переключения контуров до окончания репликации sequence генераторы и автоинкрементные колонки не поддерживаются. Вместо них нужно использовать генераторы уникальных идентификаторов.
Ограничение использования прямых запросов. Любые HQL, JPQL или native запросы не реплицируются. Как следствие, с такими запросами безопасно можно использовать только выборку.
Обязательные настройки#
Для работы плагина нужно установить следующие обязательные параметры:
journalClient - реализация API клиентской библиотеки Прикладного Журнала (APLJ). При использовании spring boot эта реализация автоматически появится в контексте приложения.
configurator.setJournalClient(journalClient);
masterDataSource - основной источник данных:
configurator.setMasterDataSource(masterDataSource);
standinDataSource - резервный источник данных. Указывается при настройке прикладной репликации. Если плагин используется только для отправки сообщений, этот параметр можно опустить.
configurator.setStandinDataSource(standinDataSource);
moduleIdProvider - предоставляет идентификатор пользовательского модуля. Подробнее в разделе Идентификация источника данных.
configurator.setModuleIdProvider(() -> "module-id"));
journalHashKeyResolver - настройка функции партиционирования. Подробнее в разделе Партиционирование.
configurator.setJournalHashKeyResolver(hashKeyResolver);
Идентификация источника данных#
Для идентификации источника данных используется идентификатор, который задается настройкой setModuleIdProvider. Этот идентификатор будет записан в заголовок сообщения, отправляемого в Прикладной Журнал (APLJ). При обработке сообщений Прикладного Журнала (APLJ) идентификатор из заголовка сравнивается с идентификатором, переданным в настройке. Если они не совпадут, то сообщение будет отправлено в Прикладной Журнал (APLJ) как ошибочное. Это позволяет избежать обработки сообщений из других модулей, запущенных в той же зоне с тем же типом данных.
Идентификатор источника данных не должен меняться без крайней необходимости. При изменении этого идентификатора сообщения со старым идентификатором, которые были накоплены в Прикладном журнале, не смогут обработаться. При возникновении такой ситуации требуется переинициализация резервной БД.
Настройка способа захвата изменений#
Захват изменений происходит на прикладном уровне путем перехвата событий Hibernate ORM. События изменений данных формируют вектор изменений (далее вектор) — структуру, которая содержит список сущностей и значения их полей. Вектор может быть сформирован как «дельта» или как «снепшот». Вектор с типом «дельта» содержит только значения измененных полей. Вектор с типом «снепшот» содержит списки всех полей и их значения. Преимуществом «снепшот» векторов является то, что они могут обрабатываться независимо от порядка, в котором они были сформированы. Недостаток «снепшот» векторов — их размер. Такие вектора содержат в себе значения всех полей, включая коллекции, и при каждом формировании все данные, связанные с изменяемой сущностью, будут вычитываться из БД и сериализоваться для отправки потребителю.
Способ захвата изменений задается настройкой setCaptureType. Константы перечисляемого типа CaptureType:
Значение |
Описание |
|---|---|
DELTA |
Значение по умолчанию. Вектор изменений будет содержать только значения измененных полей сущности |
SNAPSHOT |
Вектор изменений будет содержать значения всех полей измененной сущности, включая коллекции |
Пример:
configurator.setCaptureType(CaptureType.DELTA)
Партиционирование#
Партиционирование сущностей необходимо для распараллеливания потока репликации и работы прикладных блокировок. Партиционирование настраивается путем передачи реализации интерфейса JournalHashKeyResolver в настройку setJournalHashKeyResolver.
Интерфейс JournalHashKeyResolver:
public interface JournalHashKeyResolver {
Set<String> getHashKeys(Collection<?> entities);
}
Функция JournalHashKeyResolver#getHashKeys вызывается перед фиксацией транзакции, и в качестве аргумента функции передается коллекция сущностей, измененных в транзакции. Реализация должна сопоставить каждой сущности ее ключ партиционирования (строковое значение, определяющее, к какой партиции относится та или иная сущность) и вернуть результирующий набор. После этого один из этих ключей будет передан в качестве ключа при отправке в Kafka и с помощью него будет выбрана партиция, а также по каждому из этих ключей может устанавливаться служебная блокировка в зависимости от выбранного метода блокировок.
Алгоритм партиционирования выбирается и реализуется пользователем, при этом пользователь ответственен за качество его реализации и эффекты, возникающие вследствие его корректной или некорректной настройки.
Партиционирование влияет на:
Степень параллелизма реплики. При некорректном распределении ключей сообщения могут попадать преимущественно в одни и те же партиции Kafka, из-за чего будет снижаться количество одновременно обрабатываемых сообщений на стороне применения, и при значительной транзакционной нагрузке реплика может отставать.
Качество порядка репликации. При наличии в одной транзакции сущностей с разными ключами партиционирования возникает неоднозначность определения партиции при отправке сообщений в Прикладной Журнал (APLJ). Это может привести к тому, что зависимые транзакции могут попадать в разные партиции и, как следствие, обрабатываться не в том порядке, в котором они были на основной БД. При отсутствии механизмов компенсации это приведет к ошибке репликации. Механизмы компенсации имеют свои издержки на упорядочивание транзакций, и, хоть они и позволяют избавиться от ошибок репликации, качество порядка реплики все равно будет влиять на производительность.
Количество записей в служебной таблице. При использовании прикладных блокировок для каждого переданного ключа партиционирования будет создана запись в служебной таблице. Это стоит учитывать при реализации функции партиционирования, так как большое количество записей может привести к разрастанию таблицы и излишнему расходу ресурсов системы хранения БД.
Доступность данных при переходе в StandIn. При использовании форсированного перевода в StandIn данные, по которым есть отставание реплики, будут заблокированы для использования. Так как данные блокируются по ключу партиционирования, от распределения ключей зависит количество сущностей, которые будут заблокированы одним неотреплицированным сообщением. К примеру, если для всех сущностей в БД используется один ключ, то любое отстающее сообщение будет блокировать для использования все данные в резервной БД.
Рекомендации для реализации функции:
После создания сущности ее ключ партиционирования не должен меняться.
Работа с данными и функция партиционирования должны быть построены таким образом, чтобы в каждой транзакции изменялись сущности по как можно меньшему количеству ключей. Идеальная ситуация - когда в транзакции меняются данные только по одному ключу. Это избавляет от возможных проблем с порядком репликации.
При необходимости ограничения размера служебных таблиц снизьте уникальность ключей партиционирования. Например, если id сущности используется в качестве ключа партиционирования — используйте в качестве ключа остаток от деления (id % n). Это не позволит количеству партиций увеличиваться.
В случае использования уникальных полей реализуйте функцию партицонирования таким образом, чтобы все транзакции с одинаковым значением уникального поля обрабатывались в рамках одной партиции.
Библиотека содержит несколько встроенных реализаций:
StaticHashKeyResolver- константная функция партиционирования. При использовании этой реализации репликация будет однопоточной. Эта реализация может быть удобной для первого запуска и тестирования, но в промышленной эксплуатации однопоточная реплика не рекомендуется к использованию.AnnotationBasedHashKeyResolver- функция партиционирования, которая использует значение поля, помеченного аннотацией@PartitionKey, в качестве ключа партиционирования.InterfaceBasedHashKeyResolver- функция партиционирования на основе интерфейсаHashKeyProvider. При использовании этой стратегии все сущности должны реализовывать интерфейсHashKeyProviderи возвращать из функцииgetHashKeyключ партиционирования.PrimitiveFieldHashKeyResolver- функция партиционирования, которая читает значение примитивного поля, указанного в конструкторе, и использует его в качестве ключа партиционирования.ReferenceFieldHashKeyResolver- функция партиционирования, которая читает значение ссылочного поля, указанного в конструкторе, и использует его в качестве ключа партиционирования.
Установка функции партиционирования:
configurator.setJournalHashKeyResolver(customPartitioner);
Значения полей, помеченных аннотацией @PartitionKey, передаются всегда, вне зависимости от установленной функции партиционирования и факта их изменения.
При работе с данными предпочтителен подход изменения в одной транзакции данных, относящихся только к одному ключу партиционирования. В этом случае практически отсутствует риск нарушения порядка репликации. Если для пользователя допустим такой подход, то он может явным образом запретить транзакции по данным, относящимся к разным ключам. Сделать это можно с помощью настройки setPartitionMultiplyingMode, которая принимает в качестве значения перечисляемый тип PartitionMultiplyingMode.
Константы перечисляемого типа PartitionMultiplyingMode:
Значение |
Описание |
|---|---|
FORBIDDEN |
Запрещает транзакции по нескольким ключам партиционирования. В случае обнаружения такой транзакции будет выброшено исключение |
WARN |
Значение по умолчанию. Разрешает транзакции по нескольким ключам партиционирования. Событие обнаружения такой транзакции будет залогировано |
ALLOW |
Разрешает транзакции по нескольким ключам партиционирования |
Пример:
configurator.setPartitionMultiplyingMode(PartitionMultiplyingMode.FORBIDDEN);
Центричность#
Классический способ реализации функции партиционирования - использование идентификаторов корневых сущностей центричной модели.
Модель данных является центричной, когда для каждой сущности модели данных можно определенным образом сказать, какой родительской сущности она принадлежит.
Для примера клиент и ряд связанных с ним атрибутов - таких как договор, реквизит, адрес и так далее. В этом примере клиент будет родительской сущностью, а связанные с ним атрибуты - дочерними. Если у каждой сущности можно узнать, какому клиенту она принадлежит, то это будет клиентоцентричная модель. Идентификатор клиента в этом случае можно использовать в качестве ключа партиционирования.
Транзакции по данным, принадлежащим одному клиенту, будут попадать в одну и ту же партицию, и между ними будет гарантия порядка отправки - применения. При отставании реплики по отдельным сущностям будут блокироваться все сущности, принадлежащие к той же родительской сущности, что и отстающие.
Распараллеливание потока репликации#
При отправке сообщения в Прикладной Журнал (APLJ) оно попадает в определенную партицию Kafka topic, при этом на стороне применения на каждую партицию будет создаваться один потребитель Kafka. Из этого следует, что сообщения одной партиции будут обрабатываться последовательно и параллельно с сообщениями из других партиций. Если записывать сообщения только в одну партицию, то при хоть сколько-нибудь значимой транзакционной нагрузке реплика начнет отставать и сообщения будут копиться в Прикладном журнале. Для того, чтобы этого не происходило, сообщения должны быть равномерно распределены по всем партициям topic. Количество партиций на topic настраивается администраторами Прикладного Журнала (APLJ). Пользователь управляет партиционированием посредством реализации функции партиционирования.
Стратегия репликации и прикладные блокировки#
Стратегия репликации устанавливает способ и порядок отправки и обработки сообщений.
Основные типы сообщений, которые может отправлять модуль источника:
Тип сообщения |
Описание |
|---|---|
ORM_CV |
Сообщение, содержащее вектор изменений |
LCK |
Сообщение о блокировке по ключу партиционирования |
ULCK |
Сообщение о подтверждении транзакции |
REJ |
Сообщение об отклонении транзакции |
При работе со стратегией репликации SIMPLE модуль источника перед коммитом транзакции отправляет сообщение, содержащее сериализованый вектор изменений ORM_CV. При установке стратегии репликации, поддерживающей прикладные блокировки, модуль источника будет также отправлять сообщение о блокировке по ключу партиционирования LCK и сообщение с подтверждением коммита транзакции ULCK.
Прикладные блокировки используются для защиты от работы с неконсистентными данными, быстрого переключения в StandIn и в алгоритмах подтверждения транзакций. Блокировки устанавливаются путем создания или изменения записи о блокировке в служебных таблицах.
Стратегия устанавливается настройкой setReplicationStrategy, которая в качестве аргумента принимает перечисляемый тип ReplicationStrategy.
Основные стратегии:
Значение |
Описание |
|---|---|
SIMPLE |
Алгоритм работы без прикладных блокировок. Не обеспечивает консистентность данных |
STANDIN_LOCKS |
Алгоритм блокировок без поддержки идемпотентности установки блокировок |
PARTITION_LOCKS |
Улучшенный алгоритм блокировок с поддержкой идемпотентности установки блокировок и подтверждения транзакций на стороне применения |
Специфичные стратегии, имеющие узкую область применения:
Значение |
Описание |
|---|---|
STANDIN_LOCKS_LEGACY |
Устаревшая. Оставлена для обратной совместимости |
STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT |
Стратегия с прикладными блокировками, поддержкой подтверждения транзакций на уровне транспорта, оптимизированая для работы с большим количеством транзакций по одному ключу партиционирования |
CONF_SUPPORT_NONSTRICT |
Стратегия с поддержкой подтверждения транзакций на уровне транспорта, оптимизированая для работы с большим количеством транзакций по одному ключу партиционирования |
STANDIN_LOCKS_CONF_SUPPORT_STRICT |
Стратегия с прикладными блокировками и поддержкой подтверждения транзакций на уровне транспорта, с гарантированым подтверждением |
CONF_SUPPORT_STRICT |
Стратегия с поддержкой подтверждения транзакций на уровне транспорта, с гарантированым подтверждением |
STANDIN_LOCKS_DEPS |
Служебная стратегия с контролем зависимостей транзакций. Не для использования конечным пользователем |
CUSTOM_LOCKS |
Пользовательская реализация конвеера блокировок |
Защита от работы с данными из незафиксированных транзакций#
При работе алгоритмов прикладной репликации может возникнуть ситуация, когда после отправки сообщения об изменениях по какой-либо причине не удается зафиксировать транзакцию в основной БД. При этом сообщение уходит в Прикладной Журнал (APLJ) и реплицируется в резервную БД. В этом случае возникнет расхождение баз данных, и после переключения в StandIn прикладной модуль будет работать с неконсистентными данными.
Для защиты от работы с неконсистентными данными используются прикладные блокировки.
При работе с блокировками перед фиксацией транзакции вместе с данными будет отправлено сообщение о блокировке по ключу партиционирования. После коммита будет асинхронно отправлено сообщение о разблокировке по этому же ключу. При этом данные в резервной БД считаются доступными, если по их ключу партиционирования пришло три сообщения: блокировка, данные и разблокировка. Если хоть одно из этих сообщений не пришло, то данные остаются заблокированными и после переключения в StandIn при попытке изменения этих данных будет выброшено исключение CanNotAcquireLockException.
Стратегии репликации, обеспечивающие защиту от работы с данными из незафиксированных транзакций:
STANDIN_LOCKS
PARTITION_LOCKS
STANDIN_LOCKS_LEGACY
STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT
STANDIN_LOCKS_CONF_SUPPORT_STRICT
Быстрое переключение в StandIn#
Прикладные блокировки позволяют использовать быстрое переключение в StandIn Прикладного журнала (APLJ). Прикладной журнал (APLJ) при быстром переключении ожидает завершения обработки всех накопленных LCK сообщений и после этого завершает переключение. После быстрого переключения будут установлены все блокировки, в то время как реплика по данным может отставать и будет продожать "докатываться" уже в процессе работы с резервным источником данных. При работе с данными после быстрого переключения может выбрасываться исключение CanNotAcquireLockException. Это значит, что реплика по этим данным еще не обработана, и данные не в актуальном состоянии. Как только до конца обработаются все оставшиеся сообщения, накопленные в очереди обработки, все данные станут доступными для использования.
Стратегии репликации, поддерживающие быстрое переключение в StandIn:
STANDIN_LOCKS
PARTITION_LOCKS
STANDIN_LOCKS_LEGACY
STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT
STANDIN_LOCKS_CONF_SUPPORT_STRICT
Подтверждение транзакций#
Алгоритм подтверждения транзакций гарантирует, что потребителем сообщений из Прикладного журнала (APLJ) будут обработаны только те сообщения, по которым было подтверждение коммита транзакции. Это позволяет избежать расхождения данных в основной и резервной БД вследствие отказа фиксации транзакции в основной БД. Алгоритм может работать либо на транспортном уровне, либо на уровне потребителя сообщений.
Подтверждение транзакций устанавливается настройкой setConfirmationMode:
Режим |
Описание |
|---|---|
NONE |
Подтверждение транзакций не производится, потребитель сообщений получает и обрабатывает все сообщения |
APPLIER |
Подтверждение транзакций на уровне применения. Потребитель получает все сообщения, но обрабатывает только подтвержденные. Реализовано только для реплики в StandIn |
TRANSPORT |
Подтверждение транзакций на уровне транспорта. Потребитель получает только подтвержденные сообщения |
Стратегии репликации, поддерживающие подтверждение на уровне транспорта:
STANDIN_LOCKS
PARTITION_LOCKS
Стратегии репликации, поддерживающие подтверждение на уровне транспорта с ограничением — один ключ партиционирования на транзакцию:
STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT
STANDIN_LOCKS_CONF_SUPPORT_STRICT
CONF_SUPPORT_STRICT
CONF_SUPPORT_NONSTRICT
Стратегии репликации, поддерживающие подтверждение на уровне применения:
PARTITION_LOCKS
Подключение режима подтверждения транзакций на уровне транспорта#
Для подключения режима подтверждения транзакций на уровне транспорта выполните:
Настройки Прикладного Журнала (APLJ)#
В Прикладном Журнале (APLJ) настройте плагины для типов данных: ORM_CV, LCK, ULCK. Плагин с типом данных ORM_CV должен работать в режиме «Ожидание при подтверждении». Пример конфигурации Прикладного Журнала (APLJ):
Настройки плагина репликации#
При использовании подтвержденных транзакций на уровне транспорта:
Включите режим подтверждения транзакции на уровне транспорта:
configurator.setConfirmationMode(ConfirmationMode.TRANSPORT);Оформите подписку с подтверждением на тип данных
ORM_CV, а также подписки на типы данныхLCK,ULCK:@Bean public SubscriptionService subscription() { SubscriptionServiceImpl subscriptionService = new SubscriptionServiceImpl(); subscriptionService.subscribeConfirmed( new ZoneId("<ZONE_ID>"), new DataType("ORM_CV"), new PluginCode("EXPORT_FUNC_SI"), new JournalConsumerImpl() ); subscriptionService.subscribe( new ZoneId("<ZONE_ID>"), new DataType("LCK"), new PluginCode("EXPORT_FUNC_SI_LCK"), new JournalConsumerImpl() ); subscriptionService.subscribe( new ZoneId("<ZONE_ID>"), new DataType("ULCK"), new PluginCode("EXPORT_FUNC_SI_LCK"), new JournalConsumerImpl() ); return subscriptionService; }Создайте Bean
com.sbt.pprb.integration.hibernate.standin.replication.ConfirmationSubscription:@Bean public ConfirmationSubscription confirmationSubscription(EntityManagerFactory entityManagerFactory) { return new ConfirmationSubscription(entityManagerFactory, <ZONE_ID>, "ORM_CV"); }
Описание стратегий репликации#
SIMPLE#
Простая стратегия репликации. Сообщение ORM_CV с сериализованым вектором изменений синхронно отправляется в транспорт перед коммитом транзакции. В случае, если после отправки транзакция не будет зафиксирована, сообщение все равно уйдет потребителю и будет обработано. Не обеспечивает консистентность реплики, не защищает от работы с неконсистентными данными, не поддерживает подтверждение транзакций и не поддерживает быстрое переключение в StandIn.
Стратегия не имеет издержек на работу, и в некоторых случаях обработка транзакций в резервной базе будет происходить быстрее, чем на основной.
STANDIN_LOCKS#
Стратегия репликации с поддержкой прикладных блокировок.
Перед коммитом транзакции синхронно отправляются два сообщения - LCK, содержащее в себе идентификатор транзакции и ключ партиционирования, и ORM_CV, содержащее сериализованый вектор изменений. После коммита асинхронно отправляется сообщение ULCK, содержащее в себе идентификатор транзакции и ключ партиционирования. При обработке любого из этих сообщений в служебную таблицу T_CRTJ_CLIENTLOCKEVENT добавляется запись о том, что по определенной транзакции пришло конкретное сообщение. При обработке первого сообщения по транзакции увеличивается счетчик блокировок по ключу партиционирования в таблице T_CRTJ_CLIENTLOCK. При обработке последнего счетчик уменьшается. Таким образом, если по транзакции не придет одно из сообщений (ORM_CV, LCK, ULCK), то счетчик блокировок по ключу партиционирования будет больше нуля. Если счетчик блокировок больше нуля, то ключ считается заблокированным, и при обращении к данным по этому ключу будет выброшено исключение CanNotAcquireLockException.
Каждое сообщение обрабатывается в своей транзакции. Это значит, что на обработку сообщений по одной транзакции на основном источнике данных, требуется три транзакции на резервном источнике данных. Так как сообщения LCK, DATA, ULCK могут обрабатываться параллельно, для применения одной транзакции могут потребоваться 3 соединения с БД. Это стоит учитывать при настройке пула подключений. Количество открытых соединений в один момент времени может достигать partitionCount * 3 соединений, где partitionCount - количество партиций Kafka topics, из которых вычитываются сообщения.
Особенностью работы этой стратегии является отсутствие поддержки идемпотентности применения сообщений и возможность ложных срабатываний в случае задвоения сообщений на уровне транспорта. Важно делать сверку баз данных при переключении в StandIn. Если сверка не показала расхождений, то в случае обнаружения заблокированных ключей блокировки можно сбросить вручную, выполнив запрос update T_CRTJ_CLIENTLOCK set SILOCK=0 where CLIENT_ID=?, где CLIENT_ID - ключ партиционирования, который заблокирован.
Эта стратегия не оптимизирована для работы с большим количеством ключей партиционирования на транзакцию. Каждый ключ партиционирования, фигурирующий в транзакции, будет приводить к новым запросам при обработки блокировок.
При использовании совместно с алгоритмом подтверждения транзакций на уровне транспорта история транзакций будет сохраняться в служебной таблице T_CRTJ_CONFIRMATIONS.
Не поддерживает подтверждение транзакций на уровне применения.
PARTITION_LOCKS#
Стратегия репликации, оптимизированая для работы с множеством ключей партиционирования на транзакцию и обеспечивающая идемпотентность применения блокировок.
В отличие от STANDIN_LOCKS, эта стратегия репликации сохраняет состояние и версию ключа партиционирования в таблицу T_CRTJ_STANDIN_SERVICE. Это позволяет гарантировать идемпотентность применения блокировок, избавляет от ложных срабатываний и добавляет возможность подтверждения транзакций на стороне применения.
Так же, как и при использовании STANDIN_LOCKS, каждое сообщение будет обрабатываться в своей транзакции.
При использовании совместно с алгоритмом подтверждения транзакций история транзакций будет сохраняться в служебной таблице T_CRTJ_CONFIRMATIONS.
Миграция со стратегии STANDIN_LOCKS на PARTITION_LOCKS#
При смене стратегии репликации меняется работа с прикладными блокировками. При переключении с Main на StandIn и обратно пользователь должен проверить наличие не реплицированных журналов и наличие блокировок.
Процесс смены стратегии:
Переход в StandIn.
Отключение репликации в Main.
Обновление Main.
Включение репликации в Main: на данном этапе пользователь должен убедится в отсутствии не реплицированных, ошибочных журналов в Прикладном журнале (APLJ), при необходимости провести сверку баз данных.
Переход в Main.
Отключение репликации в StandIn.
Обновление StandIn.
Включение репликации вStandIn.
При наличии не реплицированных журналов на 4 этапе прикладные блокировки не работают, ведется работа с неконсистентыми данными.
STANDIN_LOCKS_CONF_SUPPORT_STRICT, STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT#
Эти стратегии идентичны STANDIN_LOCKS, за исключением поддержки подтверждения транзакций:
При использовании этих стратегий не будет храниться история транзакций, только идентификатор последней транзакции по ключу партиционирования.
Эти стратегии работают только при соблюдении ограничения - один ключ партиционирования на транзакцию.
STANDIN_LOCKS_CONF_SUPPORT_NONSTRICT оптимизирована для большого количества транзакций по одному ключу партиционирования.
Так же, как и при использовании STANDIN_LOCKS, каждое сообщение будет обрабатываться в своей транзакции.
Не поддерживают подтверждение транзакций на уровне применения.
CONF_SUPPORT_STRICT, CONF_SUPPORT_NONSTRICT#
Эти стратегии используют блокировки только для подтверждения транзакций на уровне транспорта, при этом блокировки не обрабатываются на уровне применения. Быстрое переключени в StandIn не поддерживается. При использовании этих стратегий не будет храниться история транзакций, только идентификатор последней транзакции по ключу партиционирования. Эти стратегии работают только при соблюдении ограничения - один ключ партиционирования на транзакцию.
Как и при режиме SIMPLE одна транзакция на основной БД соответствует одной транзакции на резервной БД.
Отсутствует защита от работы с неконсистентными данными. Это значит, что в следующих ситуациях модуль источника сможет работать с неконсистентными данными:
По какой-то причине на обработку попали неподтвержденные транзакции (быстрое переключение в StandIn, ручная переотправка сообщений).
Переключение в StandIn при наличие необработанных или ошибочных сообщений в Прикладном журнале (APLJ).
Следует использовать с осторожностью и перед переключением в StandIn делать сверку баз данных.
Не поддерживают подтверждение транзакций на уровне применения.
Контроль версионирования и порядок применения при репликации#
Для обеспечения консистентности важно, чтобы транзакции реплицировались в том же порядке, в котором они были выполнены на основной БД. Для контроля порядка применения используется версионирование сущностей на основе механизма оптимистичных блокировок Hibernate ORM. Для обеспечения этого прикладная модель должна соответствовать требованию по наличию целочисленного поля с JPA аннотацией @Version в каждой сущности. Предпочтительный тип поля - long. При работе с данными Hibernate будет автоматически увеличивать значение этого поля при каждом изменении сущности. При формировании вектора изменений в события изменений будут записаны исходные и новые версии сущностей. В случае превышения максимального значения поля версии Hibernate устанавливает минимальное отрицательное значение версии, при этом потребители сообщений могут не поддерживать такое поведение. Поэтому не рекомендуется использовать типы int и short.
При применении вектора изменений будет происходить сравнение исходной версии сущности и текущей версии записи в резервной БД. Если они равны, то транзакция будет сохранена в резервную БД с изменением версии записи на новую версию из события изменения. При различии версий возникнет исключительная ситуация, и, в зависимости от выбранного алгоритма контроля версий, плагин отреагирует на нее соответствующим образом.
Сообщения, которые не могут быть обработаны вследствие ошибки контроля версионирования, будут отброшены в Прикладной Журнал (APLJ) с соответствующей пометкой об ошибке, что отобразится в графическом интерфейсе Прикладного Журнала (APLJ). Сообщения с ошибкой в последствии можно будет отправить на повторное применение с помощью графического интерфейса Прикладного Журнала (APLJ).
Ошибки контроля версий не являются дефектом плагина и возникают вследствие неверной настройки функции партиционирования, работы с несколькими корнями партиционирования в одной транзакции либо инцидентов Прикладного Журнала (APLJ).
В случае, когда предоставленая hash key функция (настройка setJournalHashKeyResolver) допускает несколько ключей партиционирования в результирующем наборе, возникает неоднозначность выбора партиции для отправки сообщения, и может возникнуть нарушение порядка реплики. Для компенсации таких нарушений можно использовать стратегию с упорядочиванием сообщений на стороне применения, но это реализовано только для реплики в StandIn. Другие потребители могут не обеспечивать такую компенсацию.
Алгоритм контроля порядка репликации задается настройкой setOrderingControlStrategy:
Значение |
Описание |
|---|---|
DISABLED |
Контроль версий отключен |
OPTIMISTIC_LOCK_VERSION_CONTROL |
Простой контроль версий. При несовпадении версий будет выброшено исключение |
BASIC_IDEMPOTENCY_CONTROL |
Устаревшее. Контроль версий с поддержкой идемпотентности применения. Можно использовать, только если в БД нет никаких констрейнтов за исключением первичных ключей |
SIMPLE_VERSION_CONTROL |
Простой контроль версий. При несовпадении версий будет выброшено исключение |
IDEMPOTENT_VERSION_CONTROL |
Контроль версий с улучшенной поддержкой идемпотентности применения |
IDEMPOTENT_ORDERING_CONTROL |
Контроль версий с поддержкой идемпотентности применения и упорядочивания транзакций на стороне применения |
Для IDEMPOTENT_ORDERING_CONTROL реализована настройка setDynamicRetryTimeOut. При включенной настройке вектора для последующих векторов не будет применяться ожидание зависимых векторов. После первого неуспешного применения вектор будет отмечен как ошибочный. Это позволит увеличить скорость обработки реплики по другим ключам партиционирования.
Переключение в StandIn#
Переключение в StandIn инициируется в Прикладном журнале. После этого Прикладной Журнал (APLJ) рассылает в экземпляры пользовательских модулей сообщения о начале переключения. В этот момент начинается окно недоступности, и попытка изменения данных в БД с помощью Hibernate будет вызывать исключение ReourceNotAllowedException. Прикладной Журнал (APLJ) будет дожидаться окончания обработки всех накопленных сообщений и после этого разошлет сообщение об окончании переключения. После того, как пользовательский модуль получит сообщение о переключении, плагин сделает активным резервный источник данных, и работа с БД снова станет возможной.
Настройка сериализации#
Поддерживаются несколько встроенных сериализаторов для сериализации сообщений об изменениях данных. Сериализатор устанавливается настройкой setSerializerType, принимающей в качестве аргумента перечисляемый тип SerializerType. Значение по умолчанию - BINARY_KRYO.
Значение |
Описание |
|---|---|
BINARY_KRYO |
Бинарная сериализация с использованием сериализатора Kryo |
BINARY_KRYO_GZIP |
Бинарная сериализация с использованием сериализатора Kryo и gzip сжатием |
BINARY_FST |
Бинарная сериализация с использованием сериализатора Fst |
BINARY_FST_GZIP |
Бинарная сериализация с использованием сериализатора Fst и gzip сжатием |
BINARY_JAVA |
Бинарная сериализация стандартным Java сериализатором |
JSON_GSON |
JSON сериализация с использованием сериализатора Gson |
Сравнительная таблица сериализаторов:
Сериализатор |
Человекочитаемый формат |
Сжатие |
Производительность |
|---|---|---|---|
BINARY_KRYO |
- |
- |
+++ |
BINARY_KRYO_GZIP |
- |
+ |
++ |
BINARY_FST |
- |
- |
++ |
BINARY_FST_GZIP |
- |
+ |
+ |
BINARY_JAVA |
- |
- |
+ |
JSON_GSON |
+ |
- |
- |
Настройка фильтров#
Включить или исключить сущность из процесса репликации можно с помощью аннотации @Standin и установки одного из значений атрибута replication:
ENABLED - включает репликацию для сущности или поля;
DISABLED - исключает сущность или поле из репликации.
Пример:
@Entity
@Standin(replication = Replication.DISABLED)
public class Product {
...
}
По умолчанию репликация включена для всех сущностей и полей. Изменить значение по умолчанию можно с помощью настройки setEnableReplicationByDefault:
configurator.setEnableReplicationByDefault(false); // Отключает репликацию по умолчанию
Также есть возможность настроить вручную белый или черный списки для репликации без использования аннотаций. Если репликация включена по умолчанию, тогда сущности и поля из черного списка будут исключены из репликации. Если репликация выключена по умолчанию, тогда только сущности и поля из белого списка будут участвовать в репликации.
Примеры:
configurator.setEnableReplicationByDefault(true); // Включает репликацию по умолчанию для всех сущностей
configurator.getBlacklist().addEntity("org.example.Product1"); // Исключает из репликации сущность Product1
configurator.getBlacklist().addProperty("org.example.Product2", "attribute"); // Исключает из репликации поле attribute сущности Product2
configurator.setEnableReplicationByDefault(false); // Выключает репликацию по умолчанию для всех сущностей
configurator.getWhitelist().addEntity("org.example.Product1"); // Включает репликацию для сущности Product1
configurator.getWhitelist().addProperty("org.example.Product2", "attribute"); // Включает репликацию для поля attribute сущности Product2
Миграция схемы и модели данных#
Плагин репликации не обеспечивает явным образом обратной совместимости пользовательской прикладной модели данных при ее обновлении. В Прикладном журнале в любой момент времени присутствуют накопленные сообщения об изменениях данных с той моделью, с которой они были сформированы. Несовместимые изменения модели могут привести к ошибкам репликации сообщений, сформированных с предыдущей версией модели. Поэтому пользователь должен обеспечивать обратную совместимость при изменении прикладной модели данных.
Изменения модели нужно производить с учетом следующих возможных последствий:
Добавление сущности или необязательного поля - обратно совместимое изменение. Не приводит к ошибкам репликации. Накопленные сообщения могут быть обработаны как с предыдущей версией, так и с новой.
Добавление обязательного поля - несовместимое изменение. Может привести к ошибкам репликации, что потребует переинициализации резервной БД.
Удаление сущности или поля - несовместимое изменение. Может привести к ошибкам репликации, что потребует переинициализации резервной БД. Можно выполнить в два релиза - в первом релизе исключить поле или сущность из репликации и во втором удалить их.
Переименование сущности или поля - несовместимое изменение, интерпретируется как удаление старой сущности или поля и добавление новых. Можно выполнить в два релиза - в первом добавить новую сущность или поле и исключить старые из репликации, во втором релизе - удалить старую сущность или поле.
Изменение типа поля - несовместимое изменение, может привести к ошибкам репликации, что потребует переинициализации резервной БД.
При изменении модели данных в два релиза нужно убедиться, что все сообщения с предыдущей моделью были обработаны.
Откат модуля к предыдущей версии равнозначен выпуску новой версии с обратными операциями изменения прикладной модели данных.
Таблица обратных операций:
Операция |
Обратная операция |
|---|---|
Добавление поля |
Удаление поля |
Переименование поля |
Переименование поля |
Изменение типа поля |
Изменение типа поля |
Удаление поля |
Добавление поля |
Добавление сущности |
Удаление сущности |
Переименование сущности |
Переименование сущности |
Изменение суперкласса сущности |
Изменение суперкласса сущности |
Удаление сущности |
Добавление сущности |
К примеру, если в версии 2 пользовательского модуля был добавлен новый класс сущности, то откат на версию 1 с точки зрения изменения модели данных будет равнозначен удалению сущности, что приведет к эффектам, описанным для операции удаления сущностей.
Заполнение заголовков журнала#
При необходимости ручного заполнения заголовков контейнера сообщения (журнала) пользователь может воспользоваться настройкой setJournalListener, передав в нее реализацию интерфейса com.sbt.pprb.integration.replication.journal.JournalHeaderHandler. Функция интерфейса handle будет вызываться при отправке сообщения в Прикладной Журнал (APLJ), в аргумент будет передан заголовок контейнера, и пользователь сможет заполнить необходимую информацию.
Пример:
configurator.setHeaderHandler(header -> header.setSourceInfo("test-module"));
Настройка подключения к Прикладному журналу (APLJ), инициализация резервной БД, сверка#
Настройка подключения к Прикладному журнала, включая количество повторных обработок, лежит в зоне ответственности пользователя и выполняется в соответствии с документацией Прикладного журнала.
Механизмы инициализации и сверки (включая требования к БД и схеме данных) – функциональность Прикладного журнала. Для получения инструкций по работе с ними нужно обратиться к документации Прикладного журнала.
Самостоятельное создание снепшота сущности для вектора изменений#
Вектор изменений можно сформировать вручную и самостоятельно отправить его в Прикладной журнал.
Для создания вектора изменений можно использовать класс com.sbt.pprb.integration.changevector.building.snapshot.SnapshotVectorBuilder:
SnapshotVectorBuilder vectorBuilder = new SnapshotVectorBuilder(new HibernateMetadataSource(entityManagerFactory));
CreateEvent createEvent = vectorBuilder.buildCreateEvent(entity);
ChangeVector changeVector = vectorBuilder.makeVector(newArrayList(createEvent), newArrayList(), newArrayList());
Функции buildCreateEvent, buildUpdateEvent и buildDeleteEvent создают соответствующие события изменений, а с помощью функции makeVector можно объединить сформированные события в единый вектор изменений.
Модификация вектора изменений для транзакции#
Модифицировать вектор изменений, сформированный для транзакции, можно с помощью интерфейса EventsCollector.
Получение EventsCollector:
EventsCollector eventsCollector = ChangeVectorPlugin.getInstance().getCollector(entityManager);
Методы EventsCollector:
put- добавляет событие изменения сущности к вектору изменений транзакции.flush- формирует новый набор изменений.clear- очищает вектор изменений транзакции.
Репликация без предварительной выборки сущностей#
Режим репликации без предварительной выборки сущностей позволяет ускорить обработку реплики путем сокращения количества запросов при применении изменений.
Изменения сущностей будут применяться запросом update без предварительной выборки, при этом будет произведен контроль версий.
В случае, если обновление записи в БД не удалось, алгоритм вернется к стандартному режиму для текущей транзакции, и сделает выборку всех сущностей, участвующих в применении.
Включение режима:
configurator.setUseDynamicUpdateWithoutSelect(true);
Метрики#
Плагин не имеет интеграции с системами сбора метрик, но можно использовать подписки на определенные события, чтобы собрать необходимые данные.
Метрика количества заблокированных партиций#
Оформление подписки на получение количества заблокированных ключей партиционирования:
configurator.subscribeLocksMetric(metricParams)
Метод subscribeLocksMetric принимает в качестве аргумента объект класса LocksMetricParams.
Методы LocksMetricParams:
setInterval- устанавливает интервал опроса служебной таблицы в миллисекундах.setMaxRowCount- устанавливает порог количества записей в служебной таблице, при превышении которого метрика не будет собираться. Используется в качестве защиты от «тяжелых» запросов.setCheckRowCountInterval- устанавливает интервал получения количества записей в служебной таблице.setDataSourceMetric- указывает базу данных, из которой будут запрашиваться данные.setListener- устанавливает метод, который будет вызываться после получения количества заблокированных записей.
Данная метрика работает по расписанию и создает запросы к базе данных
Метрика количества обрабатываемых журналов#
Оформление подписки на получение количества обрабатываемых журналов:
configurator.setJournalListener(journalListener);
Метод setJournalListener принимает в качестве аргумента объект класса JournalListener.
Методы JournalListener:
beforeSerialize— устанавливает метод, который будет вызываться перед сериализацией журнала, аргументы метода — тип данных, исходные данные.afterSerialize— устанавливает метод, который будет вызываться после сериализации журнала, аргументы метода — продолжительность сериализации, тип данных, исходные данные, сериализованные данные.beforeSend— устанавливает метод, который будет вызываться перед отправкой журнала, аргументы метода — отправляемый журнал.afterSend— устанавливает метод, который будет вызываться после отправки журнала, аргументы метода — время отправки, отправляемый журнал.onChangeProcessedJournalCount— устанавливает метод, который будет вызываться при начале/завершении обработки журнала, аргументы метода — количество обрабатываемых журналов.
Метрики обработки журналов#
Для оформления подписки на метрики обработки журналов:
Создайте класс, реализующий интерфейс
com.sbt.pprb.integration.replication.metrics.JournalMetricsConsumer:package sbp.hibernate.example.config; import com.sbt.pprb.integration.replication.metrics.JournalMetricsConsumer; public class JournalMetricsConsumerTest implements JournalMetricsConsumer { @Override public void onDeserializeDuration(UUID txId, Duration duration) { } @Override public void onApplyingDuration(UUID txId, Duration duration) { } @Override public void onVectorReceived(long size) { } @Override public void onSendSuccess(UUID txId, Duration sendDuration, long size) { } @Override public void onSendFail(UUID txId, Exception e) { } @Override public void onApplySuccess(String txId) { } @Override public void onApplyFail(String txId, String exception) { } @Override public void onApplyWithWarn(String txId, String message) { } @Override public void onOrderingRetryTime(String txId, Duration duration) { } @Override public void onRetryCount(String txId, int count) { } }Создайте файл
com.sbt.pprb.integration.replication.metrics.JournalMetricsConsumerв ресурсах проектаsrc\main\resources\META-INF\services, в котором укажите полное название класса:com.sbt.pprb.integration.replication.metrics.JournalMetricsConsumerTest.
Методы JournalMetricsConsumer:
onDeserializeDuration— устанавливает метод, который будет вызываться после десериализации журнала, аргументы метода — идентификатор транзакции, продолжительность десериализации.onApplyingDuration— устанавливает метод, который будет вызываться после применения журнала, аргументы метода — идентификатор транзакции, продолжительность применения журнала.onVectorReceived— устанавливает метод, который принимает размер принятого журнала.onSendSuccess— устанавливает метод, который будет вызываться после успешной отправки журнала, аргументы метода — идентификатор транзакции, продолжительность отправки журнала, размер отправленного журнала.onSendFail— устанавливает метод, который будет вызываться после неуспешной попытки отправки журнала, аргументы метода — идентификатор транзакции, ошибка.onApplySuccess— устанавливает метод, который будет вызываться после успешного применения журнала, аргументы метода — идентификатор транзакции.onApplyFail— устанавливает метод, который будет вызываться после успешного применения журнала, аргументы метода — идентификатор транзакции, сообщение с ошибкой.onApplyWithWarn— устанавливает метод, который будет вызываться после успешного применения журнала, аргументы метода — идентификатор транзакции, сообщение предупреждения.onOrderingRetryTime— устанавливает метод, который принимает продолжительность ожидания зависимых журналов или журналов разблокировки.onRetryCount— устанавливает метод, который будет принимать количество попыток применить журнал.
Служебные таблицы#
Работа с отдельными схемами служебных таблиц#
Для работы с отдельной схемой для служебных таблиц зарегистрируйте bootstrap-сервис:
Создайте класс, реализующий интерфейс
com.sbt.pprb.integration.hibernate.adapter.TableSchemaProvider:package sbp.hibernate.example.config; import com.sbt.pprb.integration.hibernate.adapter.TableSchemaProvider; public class TableSchemaProviderTest implements TableSchemaProvider { @Override public String getMainSchema() { return "main_schema"; } @Override public String getStandInSchema() { return "standin_schema"; } }Создайте файл
com.sbt.pprb.integration.hibernate.adapter.TableSchemaProviderв ресурсах проектаsrc\main\resources\META-INF\services, в котором укажите полное название класса:sbp.hibernate.example.config.TableSchemaProviderTest.
Чистка служебных таблиц#
Чистка служебных таблиц производится по расписанию. Для работы необходимо настроить необходимые параметры конфигуратора:
configurator.setTableCleanerParams(tableCleanerParams);
Метод setTableCleanerParams принимает в качестве аргумента объект класса TableCleanerParams.
Методы TableCleanerParams:
setInterval— устанавливает интервал между очистками служебных таблиц.setMaxAge— устанавливает максимальное время жизни записей в служебных таблицах без изменений.setMaxDeleteRowCount— устанавливает максимальное количество записей за одну итерацию.
Работа без StandIn базы#
Данный режим поддерживается в режиме подтверждения TRANSPORT.
Для работы в данном режиме необходимо:
исключить из конфигурации настройку:
configurator.setStandinDataSource(standinDataSource);либо установить значение — null:
configurator.setStandinDataSource(null);
В этом режиме требуется подписка только на журнал сообщений ULCK.
Пример подписки:
@Bean
@Primary
public SubscriptionService confSubscription() {
SubscriptionServiceImpl subscriptionService = new SubscriptionServiceImpl();
subscriptionService.subscribe(
new ZoneId(zoneId),
new DataType("ULCK"),
new PluginCode("EXPORT_FUNC_SI_LCK"),
new JournalConsumerImpl()
);
return subscriptionService;
}