pgq. Организация монопольных очередей сообщений#

Версия: 3.5.1.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pgqd.

Схема размещения: pgq.

Описание#

Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pgqd.

При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.

Функции#

Функция

Входные параметры

Возвращаемое значение

Описание

create_queue

text

int

Создает новую очередь с заданным именем

drop_queue

text, bool

integer

Удаляет очередь и все связанные с ней таблицы

drop_queue

text

integer

Удаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди

set_queue_config

text, text, text

integer

Задает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1)

insert_event

text, text, text

bigint

Добавляет событие в очередь

insert_event

text, text, text, text, text, text, text

bigint

Добавляет в очередь событие со всеми дополнительными полями

current_event_table

text

text

Возвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове COPY

register_consumer

text, text

Подписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди

register_consumer

text, text, bigint

integer

Расширяет регистрацию возможностью задать tick_id

register_consumer_at

text, text, bigint

integer

Используется для отложенной регистрации

unregister_consumer

text, text

integer

Исключает «потребителя» (consumer) из очереди

next_batch_info

text, text, int8, int8, int8, timestamptz, timestamptz, int8, int8

int8

Переходит к следующему элементу очереди, NULL при отсутствии

next_batch

text, text

int8

Возвращает идентификатор элемента очереди, NULL при отсутствии

next_batch_custom

text, text, interval, int4, interval, int8, int8, int8, timestamptz, timestamptz, int8, int8

int8

Возвращает идентификатор элемента очереди

get_batch_events

bigint, bigint, timestamptz, bigint, int4, text, text, text, text, text, text

setof record

Возвращает список всех элементов очереди

get_batch_cursor

bigint, text, int4, text, bigint, timestamptz, bigint, int4, text, text, text, text, text, text

setof record

Возвращает список событий

event_retry

bigint, bigint, timestamptz

integer

Помещает событие в очередь для повторного вызова в дальнейшем

finish_batch

bigint

integer

Закрывает блок сообщений

get_queue_info

text, integer, integer, interval, timestamptz, boolean, boolean, integer, interval, interval, interval, float8, bigint, bigint

setof record

Выводит информацию о всех очередях

get_consumer_info

text, text, interval, bigint, bigint, bigint, bigint

setof record

Выводит информацию о «потребителях» (consumer) во всех очередях

Доработка#

Доработка не проводилась.

Ограничения#

Ограничения отсутствуют.

Установка#

Установка расширения (ALT SP 8, Astra Linux):

sudo apt-get install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y

Для остальных ОС:

sudo dnf install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq;

Настройка#

Настройка менеджера очередей pgqd#

Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).

При запуске сервиса происходит:

  • подключение к сконфигурированной сущности;

  • поиск установленного расширения по всем базам данным, доступным для подключения;

  • вызов функции pgq.ticker() c заданной периодичностью;

  • вызов функций pgq.maint_rotate_tables_step1(), pgq.maint_rotate_tables_step2() c заданной периодичностью;

  • вызов функции pgq.maint_retry_events() с заданной периодичностью.

Изменение конфигурации pgqd производится с использованием конфигурационного файла pgqd.ini:

[pgqd]
# where to log
logfile = ~/log/pgqd.log
# pidfile
pidfile = ~/pid/pgqd.pid
 
## optional parameters ##
# libpq connect string without dbname=
#base_connstr =
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list = feat_pgq
# log into syslog
syslog = 1
syslog_ident = pgqd
 
## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
ticker_period = 1

Описание конфигурационных параметров:

Конфигурационный параметр

Значение по умолчанию

Описание

logfile

~/log/pgqd.log

Путь к log-файлу

pidfile

~/pid/pgqd.pid

Путь к pid-файлу

base_connstr

-

Строка подключения к сущности (libpq syntax). Не должна включать имя базы данных

initial_database

template1

База данных для автоматического подключения (при отсутствии параметра database_list)

database_list

-

Список баз данных для подключения. Если не задан - производится автоматический поиск

syslog

1

Признак логирования действий в syslog

syslog_ident

pgqd

Идентификатор syslog

check_period

60

Периодичность поиска новых баз данных (в случае автоматического поиска), сек

retry_period

30

Периодичность поиска сообщений, повторно добавляемых в очередь, сек

maint_period

120

Периодичность проведения регламентных операций, сек

ticker_period

1

Периодичность синхронизации очередей, сек

Доступные параметры командной строки при использовании утилиты:

./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
  -v        Increase verbosity
  -q        No output to console
  -d        Daemonize
  -h        Show help
  -V        Show version
 --ini      Show sample config file
  -s        Stop - send SIGINT to running process
  -k        Kill - send SIGTERM to running process
  -r        Reload - send SIGHUP to running process

Для окончания конфигурирования сервиса требуется его перезагрузка (reload):

sudo systemctl reload pgqd

Настройка расширения pgq#

  1. Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:

    SET ROLE tuz_pgq_admin;
    -- <n> - number of desired tables per queue 
    ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;
    
  2. Настройте остальные параметры очереди:

    SET ROLE tuz_pgq_admin;
    -- <queue_param> one of:
    --   queue_ticker_max_count
    --   queue_ticker_max_lag
    --   queue_ticker_idle_period
    --   queue_ticker_paused
    --   queue_rotation_period
    --   queue_external_ticker
    SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>');
    
  3. Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.get_queue_info();
    
  4. Проверьте количество сообщений, ожидающих обработки:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']);
    
  5. Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:

    SET ROLE tuz_pgq_admin;
    UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
    

Примечание:

Изменение типа данных для сообщений (на json/b или xml) не рекомендуется. Основная функция для добавления сообщений pgq.insert_event_raw написана на языке Си, скомпилирована в расширение и не предусматривает изменение состава полей.

Изменение позиции повторной отправки сообщения (event retry) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди. Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.

Проверка работоспособности#

При штатной работе менеджера очередей:

  • Наблюдается увеличение значений поля last_tick_id.

  • Проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.

Примечание:

При нештатной работе менеджера очередей наблюдаются одинаковые значение поля last_tick_id между итерациями, а так же возрастание значений в поле ticker_lag.

Использование модуля#

Обозначения переменных

Далее в примерах имеются следующие обозначения:

  • <name_queue> - наименование очереди;

  • <type_event> - тип события;

  • <event_information> - информация о событии;

  • <name_consumer> - наименование потребителя;

  • <id_event> - идентификатор события;

  • <id_batch> - идентификатор последовательности;

  • <second_until_next_attempt> - количество секунд до повторной попытки выполнить действие.

Создание очереди:

 select * from pgq.create_queue(<name_queue> text);

Добавление в очередь сообщения:

select * from pgq.insert_event(<name_queue> text, <type_event> text, <event_information> text);

Создание «потребителя» (consumer):

select * from pgq.register_consumer(<name_queue> text, <name_consumer> text);

Внимание!

Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.

Получение идентификатора (ID) блока последовательных сообщений в очереди:

select * from pgq.next_batch(<name_queue> text, <name_consumer> text);

Получение всех событий блока последовательных сообщений:

select * from pgq.get_batch_events(<id_batch> bigint);

Примечание:

Возвращенное значение может быть пустым.

Повторное помещение события в очередь:

select * from pgq.event_retry(<id_batch> bigint, <id_event> bigint, <second_until_next_attempt> integer);

Приостановление очереди на прием:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');

Возобновление работы очереди сообщений:

SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');

Ссылки на документацию разработчика#

Функции расширения pgq: https://pgq.github.io/extension/pgq/files/external-sql.html.