pgq_coop. Организация кооперативных очередей сообщений#
Версия: 3.4.
В исходном дистрибутиве установлено по умолчанию: нет.
Связанные компоненты:
pgq.Схема размещения:
pgq_coop.
Описание#
Опциональное расширение, реализованное на языке plpgsql и предназначенное для организации кооперативных очередей. Для одного генератора сообщений могут присутствовать несколько получателей.
«subconsumer» — актор очереди сообщений, функционально являющийся частью потребителя consumer, далее субпотребитель.
Функции#
Расширение предоставляет следующие функции:
Функция |
Входные параметры |
Описание |
|---|---|---|
|
|
Завершение обработки сеанса сообщений. Решение принимается солидарно для всех субпотребителей. Параметры функции:
|
|
|
Получение информации о сеансе сообщений для субпотребителя. Аналог функции Параметры функции:
|
|
|
Расширенное получение информации о сеансе сообщений. Аналог функции Параметры функции:
|
|
|
Регистрация субпотребителя в очереди. Параметры функции:
|
|
|
Отмена регистрации субпотребителя в очереди. Параметры функции:
|
|
- |
Версия расширения. |
Доработка#
Доработка не проводилась.
Ограничения#
Требуется предварительная установка расширения pgq.
Установка#
sudo dnf install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo yum install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo apt install pangolin-pgq-coop-{version_component}_amd64.deb
sudo apt-get install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
Пример заполненной команды:
cd distributive
sudo apt-get -y install pangolin-pgq-coop-3.4-rhel8.7.x86_64.rpm
Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:
CREATE EXTENSION pgq_coop;
Настройка#
Настройка не требуется.
Использование модуля#
Работа с кооперативной очередью#
Создайте расширение
pgq_coop:SET ROLE db_admin; CREATE EXTENSION pgq_coop; \dx pgq_coopРезультатом выполнения — вывод версии расширения, например:
List of installed extensions ┌──────────┬─────────┬────────────┬─────────────────────────────────────┐ │ Name │ Version │ Schema │ Description │ ├──────────┼─────────┼────────────┼─────────────────────────────────────┤ │ pgq_coop │ 3.4 │ pg_catalog │ Cooperative queue consuming for PgQ │ └──────────┴─────────┴────────────┴─────────────────────────────────────┘ (1 row)Cоздайте очередь
coop_q:SET ROLE tuz_pgq_admin; SELECT * FROM pgq.create_queue('coop_q');Создайте для очереди
coop_qпотребителя и два субпотребителя, связанных с ним (subconsumer):SET ROLE tuz_pgq_reader; SELECT pgq.register_consumer('coop_q','test_consumer'); SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer1'); SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer2');Заполните очередь десятью блоками по 10000 сообщений:
SET ROLE tuz_pgq_writer; DO $$ DECLARE i int4; BEGIN FOR i IN 1..10 LOOP PERFORM pgq.insert_event('coop_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id; END LOOP; END $$; \watch 0Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader; SELECT * FROM pgq.get_queue_info()\gx SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐ │ queue_name │ coop_q │ │ queue_ntables │ 3 │ │ queue_cur_table │ 0 │ │ queue_rotation_period │ 02:00:00 │ │ queue_switch_time │ 2023-11-16 13:08:57.838897+03 │ │ queue_external_ticker │ f │ │ queue_ticker_paused │ f │ │ queue_ticker_max_count │ 500 │ │ queue_ticker_max_lag │ 00:00:03 │ │ queue_ticker_idle_period │ 00:01:00 │ │ ticker_lag │ 00:00:04.829513 │ │ ev_per_sec │ 2041.7830216193786 │ │ ev_new │ 0 │ │ last_tick_id │ 20 │ └──────────────────────────┴───────────────────────────────┘ SELECT * FROM pgq.get_consumer_info()\gx ┌─[ RECORD 1 ]───┬─────────────────────────────────┐ │ queue_name │ coop_q │ │ consumer_name │ test_consumer │ │ lag │ 00:00:56.706897 │ │ last_seen │ 00:00:55.567714 │ │ last_tick │ 4 │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ 119999 │ ├─[ RECORD 2 ]───┼─────────────────────────────────┤ │ queue_name │ coop_q │ │ consumer_name │ test_consumer.test_subconsumer1 │ │ lag │ NULL │ │ last_seen │ 00:00:55.562226 │ │ last_tick │ NULL │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ NULL │ ├─[ RECORD 3 ]───┼─────────────────────────────────┤ │ queue_name │ coop_q │ │ consumer_name │ test_consumer.test_subconsumer2 │ │ lag │ NULL │ │ last_seen │ 00:00:54.789057 │ │ last_tick │ NULL │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ NULL │ └────────────────┴─────────────────────────────────┘Пример автоматизированного конвейра получения и обработки сеансов:
SET ROLE tuz_pgq_reader; DO $$ DECLARE v_batch_id int8; v_event_cnt int8; BEGIN -- subconsumer 1 SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer1') INTO v_batch_id; IF v_batch_id ISNULLNOTNULL THEN RETURN; END IF; SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt; RAISE NOTICE 'Got % events from batch for subconsumer1 %', v_event_cnt::text,v_batch_id::TEXT; PERFORM pgq_coop.finish_batch(v_batch_id); END IF; --subconsumer 2 SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer2') INTO v_batch_id; IF v_batch_id ISNULLNOTNULL THEN RETURN; END IF; SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt; RAISE NOTICE 'Got % events from batch for subconsumer2 %', v_event_cnt::text,v_batch_id::TEXT; PERFORM pgq_coop.finish_batch(v_batch_id); END IF; END; $$; \watch 1При успешном результате наблюдается последовательное увеличение
batch_idи периодическое получение сообщений до исчерпания очереди.Получите статистическую информацию по очереди и потребителям:
SET ROLE tuz_pgq_reader; SELECT * FROM pgq.get_queue_info()\gx SELECT * FROM pgq.get_consumer_info()\gxРезультат выполнения команды — получение таблиц вида:
SELECT * FROM pgq.get_queue_info()\gx ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐ │ queue_name │ coop_q │ │ queue_ntables │ 3 │ │ queue_cur_table │ 0 │ │ queue_rotation_period │ 02:00:00 │ │ queue_switch_time │ 2023-11-15 14:18:03.430572+03 │ │ queue_external_ticker │ f │ │ queue_ticker_paused │ f │ │ queue_ticker_max_count │ 500 │ │ queue_ticker_max_lag │ 00:00:03 │ │ queue_ticker_idle_period │ 00:01:00 │ │ ticker_lag │ 00:00:58.33156 │ │ ev_per_sec │ 11.074713891632932 │ │ ev_new │ 0 │ │ last_tick_id │ 49 │ └──────────────────────────┴───────────────────────────────┘ SELECT * FROM pgq.get_consumer_info()\gx ┌─[ RECORD 1 ]───┬─────────────────────────────────┐ │ queue_name │ coop_q │ │ consumer_name │ test_consumer │ │ lag │ 00:04:46.626124 │ │ last_seen │ 00:00:51.636231 │ │ last_tick │ 46 │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ 0 │ ├─[ RECORD 2 ]───┼─────────────────────────────────┤ │ queue_name │ coop_q │ │ consumer_name │ test_consumer.test_subconsumer1 │ │ lag │ NULL │ │ last_seen │ 00:00:51.636231 │ │ last_tick │ NULL │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ NULL │ ├─[ RECORD 3 ]───┼─────────────────────────────────┤ │ queue_name │ coop_q │ │ consumer_name │ test_consumer.test_subconsumer2 │ │ lag │ NULL │ │ last_seen │ 00:00:51.636231 │ │ last_tick │ NULL │ │ current_batch │ NULL │ │ next_tick │ NULL │ │ pending_events │ NULL │ └────────────────┴─────────────────────────────────┘Удалите очередь:
SET ROLE tuz_pgq_reader; SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE); SET ROLE tuz_pgq_admin; SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);Результат выполнения команды — очередь удалена.
Прав
tuz_pgq_readerнедостаточно для удаления очереди. Правtuz_pgq_adminдостаточно для удаления очереди.
Ссылки на документацию разработчика#
Функции расширения pgq_coop: https://github.com/pgq/pgq-coop.