Настройка многопоточной репликации#

Для наибольшей скорости записи в базу данных на стороне воркера применителя удалите ограничения (constraints), первичные (primary key) и внешние ключи (foreign key).

Если нужно сохранить ограничения, повысьте скорость записи в БД, установив отложенные ограничения (deferred constraints). Это обеспечит небольшой прирост скорости записи в БД приемника, примерно 3%-5%.

Последовательность выполнения#

  1. Чтобы установить отложенные ограничения, отредактируйте тип ограничений командой ALTER:

alter table ${table_name} alter constraint ${constraint_name} DEFERRABLE INITIALLY DEFERRED;

Или при создании таблицы объявите внешние ключи как DEFERRABLE INITIALLY DEFERRED, если на таблицу нет ссылок из других таблиц по внешним ключам:

CREATE TABLE IF NOT EXISTS source.review
(
id bigint NOT NULL,
check_in_date date,
details character varying(5000) COLLATE pg_catalog."default",
idx integer,
rating integer,
title character varying(255) COLLATE pg_catalog."default",
trip_type integer,
hotel_id bigint,
CONSTRAINT review_pkey PRIMARY KEY (id) deferrable initially deferred,
CONSTRAINT review_hotel_id_fkey FOREIGN KEY (hotel_id)
    REFERENCES source.hotel (id)
    DEFERRABLE INITIALLY DEFERRED
);
  1. Чтобы поменять все внешние ключи в указанной схеме на отложенные, пропишите в клиентском терминале psql, в БД применителя функцию:

CREATE OR REPLACE FUNCTION change_constraints_deferrable(
p_schema_name character varying,
p_def boolean)
RETURNS setof text
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_table_name VARCHAR;
v_constraint_name VARCHAR;
BEGIN
FOR v_table_name IN
    SELECT c.relname FROM pg_class c
    JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname = p_schema_name AND c.relkind = 'r'
LOOP
        FOR v_constraint_name IN
        SELECT conname FROM pg_constraint
        WHERE contype = 'f'
        AND condeferred != p_def
        AND conrelid = (SELECT c.oid
                FROM pg_class c
                JOIN pg_namespace nsp     ON nsp.oid = C.RELNAMESPACE
                WHERE c.relname = v_table_name and nsp.nspname = p_schema_name )
    LOOP
        EXECUTE 'ALTER TABLE ' || p_schema_name || '.' || v_table_name || ' ALTER CONSTRAINT ' || v_constraint_name || case when p_def then ' DEFERRABLE INITIALLY DEFERRED' else '' end;
        RETURN NEXT v_table_name || '.'|| v_constraint_name;
    END LOOP;
END LOOP;
END;
$BODY$;
  1. Для вызова функции, введите команду SELECT:

select * from change_constraints_deferrable(${schema_name}, true)
  1. После установки отложенных ограничений, включите многопоточное применение. Многопоточное применение увеличит скорость записи в БД в 2-3 раза.

Чтобы включить многопоточное применение, добавьте параметр "apply.thread.count": {число_потоков}, указав число потоков, которое хотите запустить, в интерфейсе консоли управления, во вкладке Соединения, окне Редактирование свойств соединения Target, поле Опции:

Многопоточное применение

Параметры многопоточной репликации#

  • apply.thread.count — количество потоков репликации (default = 1);

  • max.pool.size — количество соединений к БД приемнику в пуле, рекомендуется установить как apply.thread.count + 2 (default = 3);

  • transaction.size — количество векторов изменений в транзакции (default = 500);

  • db.linger.ms — количество миллисекунд на формирование пачки векторов изменений в транзакции (default = 1000).

Если время на сбор транзакции db.linger.ms меньше значения по умолчанию, то количество векторов изменений в одной транзакции transaction.size может быть меньше, чем выставлено в параметре.

Рекомендуемые значения параметров многопоточной репликации#

   {
   "max.pool.size": 20,
   "apply.thread.count": 18,
   "transaction.size": 100, 
   "db.linger.ms": 1000
   }

Результат#

В интерфейсе консоли управления, во вкладке Соединения, окне Редактирование свойств соединения Target, поле Опции отображается число потоков и другие параметры многопоточной репликации.