Привет, Хабр! На связи СберТех — мы создаём Platform V, цифровую платформу Сбера для разработки бизнес-приложений.
В платформу входит более 60 продуктов на базе собственных сборок open source, доработанных до уровня enterprise по функциональности, безопасности, производительности и отказоустойчивости.
В этой статье расскажем про реализацию паттерна Change Data Capture и межкластерной репликации данных в продукте Platform V DataGrid, распределённой in-memory базе данных для высокопроизводительных вычислений. А также об особенностях внедрения функции и вариантах репликации. Написать материал помог наш коллега Николай Ижиков из команды по развитию баз данных на стеке open source.
Что такое Change Data Capture
Представим, что у вас есть база данных с критичными для бизнеса данными и жёсткими SLA по чтению и записи.
В то же время есть компоненты системы, которым необходимо реагировать на изменения в данных, например на уведомления о поступлении нового заказа, изменение данных пользователя и т. д. Таких компонентов много, и со временем их количество только растёт.
Если база уже нагруженная, навешивать на микросервис, создающий заказ, синхронную обработку или писать хранимую процедуру нерационально — увеличим тайминги по ответам, вероятен риск нарушения SLA.
И тут на помощь приходит паттерн Change Data Capture или CDC.
Когда в базу данных вносятся изменения, для ускорения записи и оптимизации операций она пишет журнал изменений (UNDO, REDO логи). В него база данных последовательно записывает дельту, которую вы изменяете.
CDC — это приложение, которое умеет обрабатывать логи изменений, выделять из них события об изменении данных и уведомлять об этом потребителя изменений, реализующего бизнес-логику.
В результате получаем линейные, упорядоченные по времени события изменений данных: что было, что есть, какая операция, над какой таблицей или кэшем была выполнена. Обработка асинхронная, не триггер и не хранимая процедура. Данные закоммитились, приложение, которое их отправляет, продолжило работать дальше, а потребитель через какое-то время получает уведомление о произошедшем изменении.
Как и когда использовать CDC
Стриминг изменений в хранилище данных. У вас есть DWH. Обычно в режиме реального времени данные в неё поступать не должны. Для перекладывания данных можно написать процедуры, которые будут раз в час или в сутки определять дельту и отправлять её в хранилище. С помощью CDC эти же данные можно перекладывать с меньшими задержками.
- Постобработка событий. В системе произошло событие — пользователь зарегистрировался, создал заказ, загрузил новый файл, — и, согласно бизнес-процессу, по новой записи нужно инициировать модерацию или другие действия.
- Аналитика. По поступающим в CDC событиям можно считать аналитику в режиме, близком к реальному времени.
- Логическая репликация. В CDC у нас на руках есть ВСЕ изменения, которые происходят в базе. Для реализации репликации нужно всего лишь надёжно исполнить их на реплике.
CDC в open source database
Дизайн
При любой доработке сложной системы, к которой, очевидно, относится распределённая СУБД, всегда есть риск что-то сломать. Лучший выход — делать новую фичу, вообще не трогая существующие.
Поэтому, проектируя CDC на базе Ignite, команда решила, что ignite-cdc должен выступать как отдельный java-процесс, не влияющий на ноду Ignite.
Ignite в persistence-режиме, как и любая классическая СУБД, записывает изменения в WAL (Write-Ahead Log). WAL — бинарный файл, содержащий изменения, дельты, которые мы периодически пишем в основную память (page memory).
Время от времени WAL-сегмент переходит в архив. Ignite-cdc видит, что появился архивный WAL-сегмент, и обрабатывает его.
Обработка — уведомление потребителя об изменениях. Есть public API для потребителя, но можно написать и свой.
Важно, что при этом нет перерасхода места на диске: WAL-архив — это существующая функциональность, которая нужна для восстановления после сбоев. Ignite-cdc обрабатывает ровно те же сегменты, никаких новых данных на диске не появляется.
Следующий важный момент — возможность сохранять состояние чтения. Ignite-cdc — отдельный процесс, который может падать. Нужно реализовать возможность фиксации состояния потребителя каждый раз, когда он решил, что данные обработаны и можно сохраниться. При падении обработка будет продолжена с места последнего commit-а. К счастью, поддерживать это довольно просто: нужно всего лишь сохранять указатель на том месте в сегменте, на котором чтение остановилось.
Из возможности сохранять состояние следует возможность сделать fail-fast-приложение. При любых проблемах Ignite-cdc падает. Предполагается, что поднимать его будут с помощью ОС-механизмов.
На уровне ноды всё выглядит вот так:
Есть небольшая тонкость: WAL-архив не бесконечный, Ignite складывает в архив столько сегментов, сколько было указано в настройках. При архивации n+1 сегмента самый старый удаляется.
Чтобы избежать ситуаций, когда CDC затормозил и не обработал уже удалённый сегмент, архивный сегмент hard-link’ом переносится в папку, с которой работает только Ignite-cdc.
Если удалим данные из архива, файл останется в папке для СDC, и данные будут доступны.
Если Ignite-cdc обработал сегмент, его можно будет сразу же удалить. Данные исчезнут с диска, когда оба hard-link’а будут удалены.
Приложению понадобятся метрики. API уже есть в Ignite, и его нужно переиспользовать.
API и настройки
Для настройки CDC есть три параметра, которые нужно настроить на уровне ноды.
public class DataStorageConfiguration {
long walForceArchiveTimeout;
String cdcWalPath;
}
public class DataRegionConfiguration implements Serializable {
boolean cdcEnabled;
}
Здесь:
- cdcWalPath — путь к папке, где складываются WAL-сегменты для CDC;
- cdcEnabled — включает CDC для DataRegion’а;
- walForceArchiveTimeout — таймаут принудительной архивации сегмента: даже если сегмент заполнен не полностью, по таймауту он будет архивирован и станет доступным для CDC.
С walForceArchiveTimeout есть тонкость. WAL-архив работает быстро за счёт того, что он является memory-mapped file. Это позволяет фактически писать не на диск, а в память для того, чтобы операционная система сбросила файл или мы могли сделать это вручную, когда сегмент будет заполнен.
Запись на диск — дорогая операция, в момент которой производительность ноды снижается, поэтому, с одной стороны, запись нужно делать как можно реже. С другой — CDC узнаёт об изменениях после архивации сегмента, поэтому запись нужно делать как можно чаще. Противоречие :)
Решить его можно, выбирая таймаут согласно требованиям приложения.
Теперь самое интересное — сonsumer, слушатель, который позволяет узнать и обработать изменения:
public interface CdcConsumer {
public void start(MetricRegistry mreg);
public boolean onEvents(Iterator<CdcEvent> events);
public void onTypes(Iterator<BinaryType> types);
public void onMappings(Iterator<TypeMapping> mappings);
public void stop();
}
- start, stop — для инициализации и остановки;
- onEvents — callback для обработки изменений: вернули true — состояние коммитнулось;
- onTypes, onMappings — callback’и для обработки изменений метаинформации о типах.
Что доступно в событии:
public interface CdcEvent extends Serializable {
public Object key();
@Nullable public Object value();
public boolean primary();
public int partition();
public CacheEntryVersion version();
public int cacheId();
}
- key, value — данные: value может быть null, если событие по remove’у;
- primary — событие произошло на primary или backup;
- partition — номер партиции, необходим для распределения нагрузки в соответствии с существующими в Ignite партициями;
- version — версия entry;
- cacheId — идентификатор кэша.
Таким образом, у нас есть приложение, которое в асинхронном виде получает уведомления обо всех изменениях всех данных внутри кластера Ignite. Теперь на основе этой функциональности мы можем реализовать как необходимые бизнес-функции, так и логическую репликацию.
Логическая репликация с помощью CDC
Под физической репликацией в данной статье я понимаю перенос между экземплярами БД внутреннего представления данных: страниц памяти и т. д.
Под логической — выделение потока изменений из базы-источника и его воспроизведение в базе-приёмнике.
CDC позволяет реализовать именно логическую репликацию.
В Ignite есть поддержка двух схем: Ignite to Ignite и Ignite to Kafka.
Ignite to Ignite
Внутри Ignite-cdc работает IgniteToIgniteCdcStreamer, кстати, доступный из коробки. Это consumer, который внутри себя поднимает клиентскую ноду Ignite, коннектится к кластеру-приёмнику и, получая изменения, отправляет почти обычную операцию put в кластер-приёмник.
Если кластер-источник недоступен, например из-за упавшей ноды, Ignite-cdc будет вечно ждать, пока нода не запустится. Новые данные не поступят, и процесс обработает те, которые были сгенерированы ещё живой нодой.
Если упал Ignite-cdc, то, во-первых, на всех остальных нодах он будет жив. Во-вторых, через некоторое время операционная система его перезапустит, CDC посмотрит, какие изменения он обработал, и продолжит отправлять их в соседний кластер.
Если потерялся соседний кластер или сетевая связность, Ignite-cdc также упадёт, а после перезапуска снова пойдёт в кластер-приёмник. Если кластер недоступен — падение. Если доступен — отлично, CDC начнёт отправлять в него изменения, которые были накоплены в WAL на диске. Диск является буфером изменений, которые будут копиться до тех пор, пока не получится их обработать и отправить в нужную точку.
Ignite to Kafka
Это вариант репликации для ситуаций, когда кластеры Ignite не видят друг друга, нужно использовать Kafka в качестве транспорта, или если есть несколько читателей событий.
Схема практически такая же: для обработки событий используется стример IgniteToKafkaCdcStreamer. Он раскладывает данные по партициям Kafka в соответствии с партициями Ignite.
На стороне приёмника есть приложение kafka-to-ignite — оно читает данные из Kafka и кладёт их в принимающий кластер Ignite.
Conflict resolver
Подошли к самому интересному: что произойдёт, если один ключ будет изменён на обоих кластерах?
Ответ — сработает conflict resolver. Это интерфейс, который определяет, какие именно данные должны попасть в кластер. Он может взять «старое», «новое» значение или выполнить merge.
СDC-extension предоставляет дефолтную имплементацию, но можно реализовать и свою. При этом стоит отметить, что правильного решения при конфликтах изменений не существует. Не зная ничего о данных, невозможно точно определить, какое изменение правильное, а какое нет.
Ключевые свойства дефолтной имплементации:
- Если изменение произошло на «локальном» кластере, оно выигрывает.
- Изменения с одного и того же кластера сравниваются по версии. Изменение с большей версией выигрывает.
- Если указано поле для сравнения, записи сравниваются по нему.
- Если всё предыдущее не сработало, новая запись отбрасывается. Данные разъезжаются, в логах warning, а вам нужно думать, что делать дальше.
Заключение
Внедрение паттерна CDC позволило добавить востребованную функциональность для реализации событийных подписок и создания реплик без влияния на производительность ядра самой базы данных.