pgq. Организация монопольных очередей сообщений#
Версия: 3.5.1.
В исходном дистрибутиве установлено по умолчанию: нет.
Связанные компоненты:
pgqd.Схема размещения:
pgq.
Описание#
Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pgqd.
При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.
Функции#
Функция |
Входные параметры |
Возвращаемое значение |
Описание |
|---|---|---|---|
|
|
|
Создает новую очередь с заданным именем |
|
|
|
Удаляет очередь и все связанные с ней таблицы |
|
|
|
Удаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди |
|
|
|
Задает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1) |
|
|
|
Добавляет событие в очередь |
|
|
|
Добавляет в очередь событие со всеми дополнительными полями |
|
|
|
Возвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове |
|
|
Подписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди |
|
|
|
|
Расширяет регистрацию возможностью задать |
|
|
|
Используется для отложенной регистрации |
|
|
|
Исключает «потребителя» (consumer) из очереди |
|
|
|
Переходит к следующему элементу очереди, |
|
|
|
Возвращает идентификатор элемента очереди, |
|
|
|
Возвращает идентификатор элемента очереди |
|
|
|
Возвращает список всех элементов очереди |
|
|
|
Возвращает список событий |
|
|
|
Помещает событие в очередь для повторного вызова в дальнейшем |
|
|
|
Закрывает блок сообщений |
|
|
|
Выводит информацию о всех очередях |
|
|
|
Выводит информацию о «потребителях» (consumer) во всех очередях |
Доработка#
Доработка не проводилась.
Ограничения#
Ограничения отсутствуют.
Установка#
Установка расширения (ALT SP 8, Astra Linux):
sudo apt-get install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y
Для остальных ОС:
sudo dnf install /usr/pangolin-6.3/3rdparty/pgq/pangolin-pgq-3.5.1-{OS}.x86_64.rpm -y
Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:
CREATE EXTENSION pgq;
Настройка#
Настройка менеджера очередей pgqd#
Менеджер очереди является системным сервисом, предоставляющим элементы синхронизации (tick).
При запуске сервиса происходит:
подключение к сконфигурированной сущности;
поиск установленного расширения по всем базам данным, доступным для подключения;
вызов функции
pgq.ticker()c заданной периодичностью;вызов функций
pgq.maint_rotate_tables_step1(),pgq.maint_rotate_tables_step2()c заданной периодичностью;вызов функции
pgq.maint_retry_events()с заданной периодичностью.
Изменение конфигурации pgqd производится с использованием конфигурационного файла pgqd.ini:
[pgqd]
# where to log
logfile = ~/log/pgqd.log
# pidfile
pidfile = ~/pid/pgqd.pid
## optional parameters ##
# libpq connect string without dbname=
#base_connstr =
# startup db to query other databases
#initial_database = template1
# limit ticker to specific databases
#database_list = feat_pgq
# log into syslog
syslog = 1
syslog_ident = pgqd
## optional timeouts ##
# how often to check for new databases
check_period = 60
# how often to flush retry queue
retry_period = 30
# how often to do maintentance
#maint_period = 120
# how often to run ticker
ticker_period = 1
Описание конфигурационных параметров:
Конфигурационный параметр |
Значение по умолчанию |
Описание |
|---|---|---|
|
|
Путь к |
|
|
Путь к |
|
- |
Строка подключения к сущности ( |
|
|
База данных для автоматического подключения (при отсутствии параметра |
|
- |
Список баз данных для подключения. Если не задан - производится автоматический поиск |
|
|
Признак логирования действий в |
|
|
Идентификатор |
|
|
Периодичность поиска новых баз данных (в случае автоматического поиска), сек |
|
|
Периодичность поиска сообщений, повторно добавляемых в очередь, сек |
|
|
Периодичность проведения регламентных операций, сек |
|
|
Периодичность синхронизации очередей, сек |
Доступные параметры командной строки при использовании утилиты:
./pgqd --help
usage: pgq-ticker [switches] config.file
Switches:
-v Increase verbosity
-q No output to console
-d Daemonize
-h Show help
-V Show version
--ini Show sample config file
-s Stop - send SIGINT to running process
-k Kill - send SIGTERM to running process
-r Reload - send SIGHUP to running process
Для окончания конфигурирования сервиса требуется его перезагрузка (reload):
sudo systemctl reload pgqd
Настройка расширения pgq#
Настройте количество таблиц очереди перед ее созданием, выполнив следующий запрос:
SET ROLE tuz_pgq_admin; -- <n> - number of desired tables per queue ALTER TABLE pgq.queue ALTER COLUMN queue_ntables SET DEFAULT <n>;Настройте остальные параметры очереди:
SET ROLE tuz_pgq_admin; -- <queue_param> one of: -- queue_ticker_max_count -- queue_ticker_max_lag -- queue_ticker_idle_period -- queue_ticker_paused -- queue_rotation_period -- queue_external_ticker SELECT * FROM pgq.set_queue_config('<queue_name>','<queue_param>','<param_value>');Проверьте наполнение очереди (количество элементов, скорость наполнения, работоспособность расширения):
SET ROLE tuz_pgq_admin; SELECT * FROM pgq.get_queue_info();Проверьте количество сообщений, ожидающих обработки:
SET ROLE tuz_pgq_admin; SELECT * FROM get_consumer_info(['<queue_name>','<consumer_name>']);Настройте регламентные операции для таблиц очередей - добавьте дополнительные обработки для таблиц очередей:
SET ROLE tuz_pgq_admin; UPDATE pgq.queue SET queue_extra_maint = ARRAY['<func1>','<func2>'] WHERE queue_name='<queue_name>';
Примечание:
Изменение типа данных для сообщений (на
json/bилиxml) не рекомендуется. Основная функция для добавления сообщенийpgq.insert_event_rawнаписана на языке Си, скомпилирована в расширение и не предусматривает изменение состава полей.Изменение позиции повторной отправки сообщения (
event retry) не предусмотрена. Повторно отправляемое сообщение помещается в конец очереди. Повторная приоритизация возможна созданием дополнительной очереди настройки приоритетов на уровне бизнес-логики.
Проверка работоспособности#
При штатной работе менеджера очередей:
Наблюдается увеличение значений поля
last_tick_id.Проводятся периодические подключения к БД, что может приводить к росту объема логов. После установки функциональности рекомендуется пересмотреть настройки аудита для пользователя, под которым будет производиться работа менеджера очередей.
Примечание:
При нештатной работе менеджера очередей наблюдаются одинаковые значение поля
last_tick_idмежду итерациями, а так же возрастание значений в полеticker_lag.
Использование модуля#
Обозначения переменных
Далее в примерах имеются следующие обозначения:
<name_queue>- наименование очереди;<type_event>- тип события;<event_information>- информация о событии;<name_consumer>- наименование потребителя;<id_event>- идентификатор события;<id_batch>- идентификатор последовательности;<second_until_next_attempt>- количество секунд до повторной попытки выполнить действие.
Создание очереди:
select * from pgq.create_queue(<name_queue> text);
Добавление в очередь сообщения:
select * from pgq.insert_event(<name_queue> text, <type_event> text, <event_information> text);
Создание «потребителя» (consumer):
select * from pgq.register_consumer(<name_queue> text, <name_consumer> text);
Внимание!
Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.
Получение идентификатора (ID) блока последовательных сообщений в очереди:
select * from pgq.next_batch(<name_queue> text, <name_consumer> text);
Получение всех событий блока последовательных сообщений:
select * from pgq.get_batch_events(<id_batch> bigint);
Примечание:
Возвращенное значение может быть пустым.
Повторное помещение события в очередь:
select * from pgq.event_retry(<id_batch> bigint, <id_event> bigint, <second_until_next_attempt> integer);
Приостановление очереди на прием:
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');
Возобновление работы очереди сообщений:
SET ROLE tuz_pgq_admin;
SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');
Ссылки на документацию разработчика#
Функции расширения pgq: https://pgq.github.io/extension/pgq/files/external-sql.html.