pgq_coop. Организация кооперативных очередей сообщений#

Версия: 3.4.

В исходном дистрибутиве установлено по умолчанию: нет.

Связанные компоненты: pgq.

Схема размещения: pgq_coop.

Описание#

Опциональное расширение, реализованное на языке plpgsql и предназначенное для организации кооперативных очередей. Для одного генератора сообщений могут присутствовать несколько получателей.

«subconsumer» — актор очереди сообщений, функционально являющийся частью потребителя consumer, далее субпотребитель.

Функции#

Расширение предоставляет следующие функции:

Функции расширения pgq_coop#

Функция

Входные параметры

Описание

pgq_coop.finish_batch

batch_id (INT8)

Завершение обработки сеанса сообщений. Решение принимается солидарно для всех субпотребителей.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений

pgq_coop.next_batch

queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
dead_interval (INTERVAL)

Получение информации о сеансе сообщений для субпотребителя. Аналог функции pgq.next_batch.

Параметры функции:

  • batch_id — идентификатор сеанса сообщений;

  • queue_name — имя очереди;

  • consumer_name — имя потребителя;

  • subconsumer_name — имя субпотребителя;

  • dead_interval — время, после которого сеанс считается «зависшим»

pgq_coop.next_batch_custom

queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
min_lag (INTERVAL)
min_count (INT4)
min_interval (INTERVAL)
dead_interval (INTERVAL)

Расширенное получение информации о сеансе сообщений. Аналог функции pgq.next_batch_custom.

Параметры функции:

  • queue_name — имя очереди;

  • consumer_name — имя потребителя;

  • subconsumer_name — имя субпотребителя;

  • min_lag — минимальная задержка для событий;

  • min_count — минимальное количество событий в сеансе сообщений;

  • min_interval — минимальный интервал между событиями;

  • dead_interval — время, после которого сеанс считается «зависшим»

pgq_coop.register_subconsumer

queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)

Регистрация субпотребителя в очереди.

Параметры функции:

  • queue_name — имя очереди;

  • consumer_name — имя потребителя;

  • subconsumer_name — имя субпотребителя

pgq_coop.unregister_subconsumer

queue_name (TEXT)
consumer_name (TEXT)
subconsumer_name (TEXT)
batch_handling (INT)

Отмена регистрации субпотребителя в очереди.

Параметры функции:

  • queue_name — имя очереди;

  • consumer_name — имя потребителя;

  • subconsumer_name — имя субпотребителя;

  • batch_handling — код, указывающий, как завершать текущую обработку:

    • 0 — игнорировать незавершенные сеансы;

    • 1 — завершить сеанс автоматически;

    • 2— завершить с ошибкой

pgq_coop.version

-

Версия расширения.

Доработка#

Доработка не проводилась.

Ограничения#

Требуется предварительная установка расширения pgq.

Установка#

sudo dnf install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo yum install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm
sudo apt install pangolin-pgq-coop-{version_component}_amd64.deb
sudo apt-get install pangolin-pgq-coop-{version_component}-{OS}.x86_64.rpm

Пример заполненной команды:

cd distributive

sudo apt-get -y install pangolin-pgq-coop-3.4-rhel8.7.x86_64.rpm

Модуль считается «доверенным», поэтому его могут устанавливать пользователи, имеющие право CREATE в текущей базе данных:

CREATE EXTENSION pgq_coop;

Настройка#

Настройка не требуется.

Использование модуля#

Работа с кооперативной очередью#

  1. Создайте расширение pgq_coop:

    SET ROLE db_admin;
    CREATE EXTENSION pgq_coop;
    \dx pgq_coop
    

    Результатом выполнения — вывод версии расширения, например:

                                          List of installed extensions
    ┌──────────┬─────────┬────────────┬─────────────────────────────────────┐
       Name    Version    Schema                Description             
    ├──────────┼─────────┼────────────┼─────────────────────────────────────┤
     pgq_coop  3.4      pg_catalog  Cooperative queue consuming for PgQ 
    └──────────┴─────────┴────────────┴─────────────────────────────────────┘ (1 row)
    
  2. Cоздайте очередь coop_q:

    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.create_queue('coop_q');
    
  3. Создайте для очереди coop_q потребителя и два субпотребителя, связанных с ним (subconsumer):

    SET ROLE tuz_pgq_reader;
    SELECT pgq.register_consumer('coop_q','test_consumer');
    SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer1');
    SELECT pgq_coop.register_subconsumer('coop_q','test_consumer','test_subconsumer2');
    
  4. Заполните очередь десятью блоками по 10000 сообщений:

    SET ROLE tuz_pgq_writer;
    DO
    $$
    DECLARE
    i int4;
    BEGIN
    FOR i IN 1..10 LOOP
    PERFORM pgq.insert_event('coop_q',id::text,format('MSG %1$s',id)) from generate_series(1,1000) id;
    END LOOP;
    END
    $$;
    \watch 0
    
  5. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx
    

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
     queue_name                coop_q                        
     queue_ntables             3                             
     queue_cur_table           0                             
     queue_rotation_period     02:00:00                      
     queue_switch_time         2023-11-16 13:08:57.838897+03 
     queue_external_ticker     f                             
     queue_ticker_paused       f                             
     queue_ticker_max_count    500                           
     queue_ticker_max_lag      00:00:03                      
     queue_ticker_idle_period  00:01:00                      
     ticker_lag                00:00:04.829513               
     ev_per_sec                2041.7830216193786            
     ev_new                    0                             
     last_tick_id              20                            
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────────────────────┐
     queue_name      coop_q                          
     consumer_name   test_consumer                   
     lag             00:00:56.706897                 
     last_seen       00:00:55.567714                 
     last_tick       4                               
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  119999                          
    ├─[ RECORD 2 ]───┼─────────────────────────────────┤
     queue_name      coop_q                          
     consumer_name   test_consumer.test_subconsumer1 
     lag             NULL                            
     last_seen       00:00:55.562226                 
     last_tick       NULL                            
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  NULL                            
    ├─[ RECORD 3 ]───┼─────────────────────────────────┤
     queue_name      coop_q                          
     consumer_name   test_consumer.test_subconsumer2 
     lag             NULL                            
     last_seen       00:00:54.789057                 
     last_tick       NULL                            
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  NULL                            
    └────────────────┴─────────────────────────────────┘
    
  6. Пример автоматизированного конвейра получения и обработки сеансов:

    SET ROLE tuz_pgq_reader;
    DO
    $$
    DECLARE
    v_batch_id int8;
    v_event_cnt int8;
    BEGIN
    -- subconsumer 1
    SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer1') INTO v_batch_id;
    IF v_batch_id ISNULLNOTNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch for subconsumer1 %', v_event_cnt::text,v_batch_id::TEXT;
    PERFORM pgq_coop.finish_batch(v_batch_id);
    END IF;
    --subconsumer 2
    SELECT next_batch FROM pgq_coop.next_batch('coop_q','test_consumer','test_subconsumer2') INTO v_batch_id;
    IF v_batch_id ISNULLNOTNULL THEN
    RETURN;
    END IF;
    SELECT count(*) FROM pgq.get_batch_events(v_batch_id) INTO v_event_cnt;
    RAISE NOTICE 'Got % events from batch for subconsumer2 %', v_event_cnt::text,v_batch_id::TEXT;
    PERFORM pgq_coop.finish_batch(v_batch_id);
    END IF;
    END;
    $$;
    \watch 1
    

    При успешном результате наблюдается последовательное увеличение batch_id и периодическое получение сообщений до исчерпания очереди.

  7. Получите статистическую информацию по очереди и потребителям:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.get_queue_info()\gx
    SELECT * FROM pgq.get_consumer_info()\gx
    

    Результат выполнения команды — получение таблиц вида:

    SELECT * FROM pgq.get_queue_info()\gx
    ┌─[ RECORD 1 ]─────────────┬───────────────────────────────┐
     queue_name                coop_q                        
     queue_ntables             3                             
     queue_cur_table           0                             
     queue_rotation_period     02:00:00                      
     queue_switch_time         2023-11-15 14:18:03.430572+03 
     queue_external_ticker     f                             
     queue_ticker_paused       f                             
     queue_ticker_max_count    500                           
     queue_ticker_max_lag      00:00:03                      
     queue_ticker_idle_period  00:01:00                      
     ticker_lag                00:00:58.33156                
     ev_per_sec                11.074713891632932            
     ev_new                    0                             
     last_tick_id              49                            
    └──────────────────────────┴───────────────────────────────┘
    SELECT * FROM pgq.get_consumer_info()\gx
    ┌─[ RECORD 1 ]───┬─────────────────────────────────┐
     queue_name      coop_q                          
     consumer_name   test_consumer                   
     lag             00:04:46.626124                 
     last_seen       00:00:51.636231                 
     last_tick       46                              
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  0                               
    ├─[ RECORD 2 ]───┼─────────────────────────────────┤
     queue_name      coop_q                          
     consumer_name   test_consumer.test_subconsumer1 
     lag             NULL                            
     last_seen       00:00:51.636231                 
     last_tick       NULL                            
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  NULL                            
    ├─[ RECORD 3 ]───┼─────────────────────────────────┤
     queue_name      coop_q                          
     consumer_name   test_consumer.test_subconsumer2 
     lag             NULL                            
     last_seen       00:00:51.636231                 
     last_tick       NULL                            
     current_batch   NULL                            
     next_tick       NULL                            
     pending_events  NULL                            
    └────────────────┴─────────────────────────────────┘
    
  8. Удалите очередь:

    SET ROLE tuz_pgq_reader;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);
    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.drop_queue(x_queue_name =>'coop_q', x_force => TRUE);
    

    Результат выполнения команды — очередь удалена.

    Прав tuz_pgq_reader недостаточно для удаления очереди. Прав tuz_pgq_admin достаточно для удаления очереди.

Ссылки на документацию разработчика#

Функции расширения pgq_coop: https://github.com/pgq/pgq-coop.