RateLimiterPlugin#

Описание плагина#

Плагин позволяет контролировать скорость отправки сообщений producer на определенный адрес на стороне сервера. В случае если скорость отправки сообщений будет превышать пороговое значение, установленное в конфигурационном файле, то сообщения будут поступать в очередь со скоростью, равной пороговому значению. При этом часть сообщений не будет поступать в очередь на сервер.

Подключение плагина#

  1. В classpath artemis-брокера должны быть добавлен jar-файл плагина.

  2. В конфигурационном файле broker.xml в части настройки плагинов необходимо добавить следующие настройки:

    <broker-plugins>
    ...
        <broker-plugin class-name="ru.sbt.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin">
            <property key="CONFIG_PATH" value="path_to_config_file" /> - путь до конфигурационного файла 
        </broker-plugin>
    ...
    </broker-plugins>
  1. Сформировать конфигурационный файл rate_limiter_plugin.conf, в котором должны быть указаны пороговые значения по скорости отправки сообщений producer для определенного адреса.

Пример конфигурационного файла:

    rateLimiterPlugin: {
        mode: on - включение/отключение плагина (обязательный параметр)
        reloadCheckPeriodMs: 1000 - период (в ms) проверки изменения конфигурационного файла (необязательный параметр, значение по умолчанию - 5000 ms)
        threadPoolSize: 4 - количество потоков, используемое плагином для реализации лимита(необязательный параметр)
        logMsgCreationPeriod: 5000 - период формирования лога (logLevel=INFO) о превышении порогового значения по скорости отправки сообщений (необязательный параметр, значение по умолчанию - 5_000 ms)
        usersFilePath: etc/cert-users.properties - путь до файла с пользователями, необходимый для использования user-wildcard (необязательный параметр)
        delayMs: 3000 - период измерения скорости поступления сообщений на сервер от producer (необязательный параметр, значение по умолчанию - 2000)
        excludeAddresses: ["DLQ", "DLQ::DLQ", ...] - наименования адресов, адресов::очередей, которые нужно исключить из результирующего перечня адресов, для которых будет применен плагин (необходимо явно прописать наименования как для адреса, так и для адрес::очередь, wildcard-синтаксис применять нельзя)
        producerRates: [ 
           {
                addresses: ["test.address_1"] - список наименований адресов(обязательный параметр)(поддерживаются wildCard-шаблоны Artemis, FQQN-адреса, а также wildcard для адрес::очередь)*
                threshold: 5 - пороговое значение в msg/sec (обязательный параметр),
                user: broker - имя producer, ассоциированное с producer на сервере (обязательный параметр)
            },
            {
                address: "test.address_2" - DEPRECATED - наименование адреса(обязательный параметре в случае отсутствия параметра `addresses`), ниже по приоритету по сравнению с `adresses`, имеет такой же функционал
                threshold: 50
                user: user_2
           }, 
           ...
        ]
    }

* - поддерживается wildcard-синтаксис на уровне адрес и адрес::очередь. Примеры значений: # - все адреса, #::# - все возможные комбинации адрес::очередь.

WildCard-шаблоны
FQQN-адреса

Особенности заполнения параметров в настройках RateLimiterPlugin#

  • address

    • В случае если в одном адресе(ADDRESS) определена только одна очередь(QUEUE), то клиент при формировании producer может использовать значение адреса в двух возможных вариантах - ADDRESS или ADDRESS::QUEUE - для отправки сообщений в очередь.

    Поэтому рекомендуется в настройках плагина прописывать два возможных варианта использования адреса клиентом. Например:

   rateLimiterPlugin: {
      ...
      producerRates: [
         {
            addresses: ["ADDRESS","ADDRESS::QUEUE"]
            threshold: 50
            user: user_1
         }
         ...
      ]
   }
  • Необходимо применить действие плагина для user_1 на все адреса и все возможные комбинации адрес::очередь, за исключением адреса DLQ и DLQ::DLQ

    rateLimiterPlugin: {
       ...
      excludeAddresses: ["DLQ", "DLQ::DLQ"]
       producerRates: [
          {
              addresses: ["#","#::#"]
              threshold: 50
              user: user_1
          }
          ...
       ]
    }
  • threshold — параметр должен иметь значение строго больше нуля.

Особенности работы плагина при превышении порогового значения скорости отправки сообщений на адрес#

  1. В случае отправки сообщений на сервер в асинхронном режиме:

    • на стороне сервера в логах должна отображаться запись: ERROR org.apache.activemq.artemis.core.server  - AMQ224016: Caught exception r.s.s.a.r.p.ProducerRateLimiterPlugin - PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages exceeds the threshold value for the address!

  2. В случае отправки сообщений на сервер в блокирующем режиме:

    • с сервера на клиент должно прокидываться исключение org.apache.activemq.artemis.api.core.ActiveMQException: PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages exceeds the threshold value for the address(threshold=value), code:GENERIC_EXCEPTION

  3. В обоих случаях отправки (асинхронном и блокирующем) на стороне сервера в логах должна отображаться запись: INFO r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages from Producer[address=address_value, user:user_value] exceeds the threshold value

Период формирование лога с уровнем INFO определяется настройкой logMsgCreationPeriod.

Также формируется лог на уровне TRACE с таким же содержанием, как и лог с уровнем INFO. Формирование такого лога происходит при каждом прерывании потока роутинга сообщения в очередь.

Особенности перехвата исключения о превышении скорости отправки сообщений на стороне клиента при асинхронной отправке сообщений#

  1. Для перехвата исключения на стороне клиента необходима:

    • имплементация интерфейса SendAcknowledgementHandler,

    • установка параметра confirmationWindowSize в положительное значение, например - 10 Мб.

   class AckHandler extends SendAcknowledgementHandler {
   
     override def sendAcknowledged(message: Message) {...}
   
     override def sendFailed(message: Message, exception: Exception) {
        exception.getMessage match {
          case msg: String if msg.startsWith("PRODUCER_RATE_MAX_EXCEPTION") => ...
        ...
     }
   }
  1. Подключение SendAcknowledgementHandler

    • Artemis-core-client:

  import org.apache.activemq.artemis.api.core.client.ClientSession  

  val session: ClientSession = ...
  session.setSendAcknowledgementHandler(new AckHandler(...))
  • Artemis-JMS-client:

  import javax.jms.Session

  val session: Session = ...
  val coreSession = session.asInstanceOf[ActiveMQSession].getCoreSession
  coreSession.setSendAcknowledgementHandler(new AckHandler(...))

**Для перехвата исключения на стороне клиента возможно использование обработчика Rate-max-exception_handler.

Метрики#

Плагин обладает следующим набором метрик:

  1. Счетчик количества превышений порогового значение скорости отправки сообщений в адрес для конкретного пользователя.

При инициализации плагина, метрика создается для всех пар address:user, указанных в конфигурационном файле.

MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2

org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_1",name="producerThrottleCounter"
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_2",name="producerThrottleCounter"
  1. Значение скорости отправки сообщений в адрес для конкретного пользователя.

При инициализации плагина, метрика создается для всех пар address:user, указанных в конфигурационном файле.

MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2

org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_1",name="rateMeterMetric"
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_2",name="rateMeterMetric"
  1. Статистические данные (средние значения, максимальное и минимальное значение, перцентили) для полученных данных по скорости отправки сообщений в адрес для конкретного пользователя (указаны в п.2)

MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2

org.apache.activemq.artemis:broker="brokerName",component="rate_limiter_metric",address="test_address",user="user_1",name="rateMeterStatistics"
org.apache.activemq.artemis:broker="brokerName",component="rate_limiter_metric",address="test_address",user="user_2",name="rateMeterStatistics"

Обработка ошибок загрузки/перезагрузки конфигурации плагина#

Механизм перезагрузки конфигурации реализует следующее поведение в случае возникновения ошибок при чтении или обработке конфигурации методом configProcessor:

  • если ошибка происходит при начальной инициализации плагина, то выбрасывается исключение ActiveMQException, которое приводит к остановке брокера.

    Пример фрагмента лога:

ERROR r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - Error occurred on loading PRODUCER_RATE_LIMITER_PLUGIN configuration after initial initialisation
ERROR ru.sbt.ss.artemis.plugin.commons.utils.CommonUtils$  - An error occurred while executing a block of code:String: 1: No configuration setting found for key 'producerRates'
    com.typesafe.config.ConfigException$Missing: String: 1: No configuration setting found for key 'producerRates'
        at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:157)
        at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:175)
        at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
        ...
ERROR org.apache.activemq.artemis.core.server  - AMQ224000: Failure in initialisation
    org.apache.activemq.artemis.api.core.ActiveMQException: String: 1: No configuration setting found for key 'producerRates'
        at ru.sbt.ss.artemis.plugin.commons.utils.CommonUtils$.$anonfun$orThrowActiveMQException$1(CommonUtils.scala:26)
        at scala.util.Failure.fold(Try.scala:247)
        ...
  • если ошибка происходит при перезагрузке плагина из-за обновления конфигурации, то плагин получает последнюю успешно загруженную конфигурацию через callback-метод onProcessConfig.

    Пример фрагмента лога:

INFO  r.s.s.a.p.c.config.reload.PluginConfigurationReloadCallback  - Reloading PRODUCER_RATE_LIMITER_PLUGIN
ERROR r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - Error occurred on loading PRODUCER_RATE_LIMITER_PLUGIN configuration after reload
    com.typesafe.config.ConfigException$Missing: String: 1: No configuration setting found for key 'producerRates'
        at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:157)
        at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:175)
        at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
        ...
WARN  r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - Previous successful configuration will be used: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
    "excludeAddresses" : [],
    "delay" : 2000,
    "logMsgCreationPeriod" : 4000,
    "mode" : "off",
    "producerRates" : [],
    "threadPoolSize" : 4
}

INFO  r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - Configuration: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
    "excludeAddresses" : [],
    "delay" : 2000,
    "logMsgCreationPeriod" : 4000,
    "mode" : "off",
    "producerRates" : [],
    "threadPoolSize" : 4
}
 processed successfully: PluginSettings[onMode:false, rateSettings:List(), excludeAddresses:Set(), logMsgCreationPeriod:4000, threadPoolSize:4]
  • если ошибки при обработке отсутствуют, то плагин получает новую загруженную из файла конфигурацию через callback-метод onProcessConfig.

    Пример фрагмента лога:

INFO  r.s.s.a.p.c.config.reload.PluginConfigurationReloadCallback  - Reloading PRODUCER_RATE_LIMITER_PLUGIN
INFO  r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin  - Configuration: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
    "excludeAddresses" : [],
    "delay" : 2000,
    "logMsgCreationPeriod" : 5000,
    "mode" : "off",
    "producerRates" : [],
    "threadPoolSize" : 8
}
 processed successfully: PluginSettings[onMode:false, rateSettings:List(), excludeAddresses:Set(), logMsgCreationPeriod:5000, threadPoolSize:8]