RateLimiterPlugin#
Описание плагина#
Плагин позволяет контролировать скорость отправки сообщений producer на определенный адрес на стороне сервера. В случае если скорость отправки сообщений будет превышать пороговое значение, установленное в конфигурационном файле, то сообщения будут поступать в очередь со скоростью, равной пороговому значению. При этом часть сообщений не будет поступать в очередь на сервер.
Подключение плагина#
В classpath artemis-брокера должны быть добавлен jar-файл плагина.
В конфигурационном файле broker.xml в части настройки плагинов необходимо добавить следующие настройки:
<broker-plugins>
...
<broker-plugin class-name="ru.sbt.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin">
<property key="CONFIG_PATH" value="path_to_config_file" /> - путь до конфигурационного файла
</broker-plugin>
...
</broker-plugins>
Сформировать конфигурационный файл
rate_limiter_plugin.conf, в котором должны быть указаны пороговые значения по скорости отправки сообщений producer для определенного адреса.
Пример конфигурационного файла:
rateLimiterPlugin: {
mode: on - включение/отключение плагина (обязательный параметр)
reloadCheckPeriodMs: 1000 - период (в ms) проверки изменения конфигурационного файла (необязательный параметр, значение по умолчанию - 5000 ms)
threadPoolSize: 4 - количество потоков, используемое плагином для реализации лимита(необязательный параметр)
logMsgCreationPeriod: 5000 - период формирования лога (logLevel=INFO) о превышении порогового значения по скорости отправки сообщений (необязательный параметр, значение по умолчанию - 5_000 ms)
usersFilePath: etc/cert-users.properties - путь до файла с пользователями, необходимый для использования user-wildcard (необязательный параметр)
delayMs: 3000 - период измерения скорости поступления сообщений на сервер от producer (необязательный параметр, значение по умолчанию - 2000)
excludeAddresses: ["DLQ", "DLQ::DLQ", ...] - наименования адресов, адресов::очередей, которые нужно исключить из результирующего перечня адресов, для которых будет применен плагин (необходимо явно прописать наименования как для адреса, так и для адрес::очередь, wildcard-синтаксис применять нельзя)
producerRates: [
{
addresses: ["test.address_1"] - список наименований адресов(обязательный параметр)(поддерживаются wildCard-шаблоны Artemis, FQQN-адреса, а также wildcard для адрес::очередь)*
threshold: 5 - пороговое значение в msg/sec (обязательный параметр),
user: broker - имя producer, ассоциированное с producer на сервере (обязательный параметр)
},
{
address: "test.address_2" - DEPRECATED - наименование адреса(обязательный параметре в случае отсутствия параметра `addresses`), ниже по приоритету по сравнению с `adresses`, имеет такой же функционал
threshold: 50
user: user_2
},
...
]
}
* - поддерживается wildcard-синтаксис на уровне адрес и адрес::очередь. Примеры значений: # - все адреса, #::# - все возможные комбинации адрес::очередь.
Особенности заполнения параметров в настройках RateLimiterPlugin#
addressВ случае если в одном адресе(
ADDRESS) определена только одна очередь(QUEUE), то клиент при формировании producer может использовать значение адреса в двух возможных вариантах -ADDRESSилиADDRESS::QUEUE- для отправки сообщений в очередь.
Поэтому рекомендуется в настройках плагина прописывать два возможных варианта использования адреса клиентом. Например:
rateLimiterPlugin: {
...
producerRates: [
{
addresses: ["ADDRESS","ADDRESS::QUEUE"]
threshold: 50
user: user_1
}
...
]
}
Необходимо применить действие плагина для user_1 на все адреса и все возможные комбинации адрес::очередь, за исключением адреса
DLQиDLQ::DLQ
rateLimiterPlugin: {
...
excludeAddresses: ["DLQ", "DLQ::DLQ"]
producerRates: [
{
addresses: ["#","#::#"]
threshold: 50
user: user_1
}
...
]
}
threshold— параметр должен иметь значение строго больше нуля.
Особенности работы плагина при превышении порогового значения скорости отправки сообщений на адрес#
В случае отправки сообщений на сервер в асинхронном режиме:
на стороне сервера в логах должна отображаться запись:
ERROR org.apache.activemq.artemis.core.server - AMQ224016: Caught exception r.s.s.a.r.p.ProducerRateLimiterPlugin - PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages exceeds the threshold value for the address!
В случае отправки сообщений на сервер в блокирующем режиме:
с сервера на клиент должно прокидываться исключение
org.apache.activemq.artemis.api.core.ActiveMQException: PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages exceeds the threshold value for the address(threshold=value), code:GENERIC_EXCEPTION
В обоих случаях отправки (асинхронном и блокирующем) на стороне сервера в логах должна отображаться запись:
INFO r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - PRODUCER_RATE_MAX_EXCEPTION:the rate of sending messages from Producer[address=address_value, user:user_value] exceeds the threshold value
Период формирование лога с уровнем INFO определяется настройкой logMsgCreationPeriod.
Также формируется лог на уровне TRACE с таким же содержанием, как и лог с уровнем INFO. Формирование такого лога происходит при каждом прерывании потока роутинга сообщения в очередь.
Особенности перехвата исключения о превышении скорости отправки сообщений на стороне клиента при асинхронной отправке сообщений#
Для перехвата исключения на стороне клиента необходима:
имплементация интерфейса SendAcknowledgementHandler,
установка параметра
confirmationWindowSizeв положительное значение, например - 10 Мб.
class AckHandler extends SendAcknowledgementHandler {
override def sendAcknowledged(message: Message) {...}
override def sendFailed(message: Message, exception: Exception) {
exception.getMessage match {
case msg: String if msg.startsWith("PRODUCER_RATE_MAX_EXCEPTION") => ...
...
}
}
Подключение SendAcknowledgementHandler
Artemis-core-client:
import org.apache.activemq.artemis.api.core.client.ClientSession
val session: ClientSession = ...
session.setSendAcknowledgementHandler(new AckHandler(...))
Artemis-JMS-client:
import javax.jms.Session
val session: Session = ...
val coreSession = session.asInstanceOf[ActiveMQSession].getCoreSession
coreSession.setSendAcknowledgementHandler(new AckHandler(...))
**Для перехвата исключения на стороне клиента возможно использование обработчика Rate-max-exception_handler.
Метрики#
Плагин обладает следующим набором метрик:
Счетчик количества превышений порогового значение скорости отправки сообщений в адрес для конкретного пользователя.
При инициализации плагина, метрика создается для всех пар address:user, указанных в конфигурационном файле.
MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_1",name="producerThrottleCounter"
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_2",name="producerThrottleCounter"
Значение скорости отправки сообщений в адрес для конкретного пользователя.
При инициализации плагина, метрика создается для всех пар address:user, указанных в конфигурационном файле.
MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_1",name="rateMeterMetric"
org.apache.activemq.artemis:broker="brokerName",component=rate_limiter_metric,address="test_address",user="user_2",name="rateMeterMetric"
Статистические данные (средние значения, максимальное и минимальное значение, перцентили) для полученных данных по скорости отправки сообщений в адрес для конкретного пользователя (указаны в п.2)
MBean name для метрики для адреса test_address и пользователей с именами user_1, user_2
org.apache.activemq.artemis:broker="brokerName",component="rate_limiter_metric",address="test_address",user="user_1",name="rateMeterStatistics"
org.apache.activemq.artemis:broker="brokerName",component="rate_limiter_metric",address="test_address",user="user_2",name="rateMeterStatistics"
Обработка ошибок загрузки/перезагрузки конфигурации плагина#
Механизм перезагрузки конфигурации реализует следующее поведение в случае возникновения ошибок при чтении или обработке конфигурации методом configProcessor:
если ошибка происходит при начальной инициализации плагина, то выбрасывается исключение
ActiveMQException, которое приводит к остановке брокера.Пример фрагмента лога:
ERROR r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - Error occurred on loading PRODUCER_RATE_LIMITER_PLUGIN configuration after initial initialisation
ERROR ru.sbt.ss.artemis.plugin.commons.utils.CommonUtils$ - An error occurred while executing a block of code:String: 1: No configuration setting found for key 'producerRates'
com.typesafe.config.ConfigException$Missing: String: 1: No configuration setting found for key 'producerRates'
at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:157)
at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:175)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
...
ERROR org.apache.activemq.artemis.core.server - AMQ224000: Failure in initialisation
org.apache.activemq.artemis.api.core.ActiveMQException: String: 1: No configuration setting found for key 'producerRates'
at ru.sbt.ss.artemis.plugin.commons.utils.CommonUtils$.$anonfun$orThrowActiveMQException$1(CommonUtils.scala:26)
at scala.util.Failure.fold(Try.scala:247)
...
если ошибка происходит при перезагрузке плагина из-за обновления конфигурации, то плагин получает последнюю успешно загруженную конфигурацию через callback-метод
onProcessConfig.Пример фрагмента лога:
INFO r.s.s.a.p.c.config.reload.PluginConfigurationReloadCallback - Reloading PRODUCER_RATE_LIMITER_PLUGIN
ERROR r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - Error occurred on loading PRODUCER_RATE_LIMITER_PLUGIN configuration after reload
com.typesafe.config.ConfigException$Missing: String: 1: No configuration setting found for key 'producerRates'
at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:157)
at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:175)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:189)
...
WARN r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - Previous successful configuration will be used: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
"excludeAddresses" : [],
"delay" : 2000,
"logMsgCreationPeriod" : 4000,
"mode" : "off",
"producerRates" : [],
"threadPoolSize" : 4
}
INFO r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - Configuration: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
"excludeAddresses" : [],
"delay" : 2000,
"logMsgCreationPeriod" : 4000,
"mode" : "off",
"producerRates" : [],
"threadPoolSize" : 4
}
processed successfully: PluginSettings[onMode:false, rateSettings:List(), excludeAddresses:Set(), logMsgCreationPeriod:4000, threadPoolSize:4]
если ошибки при обработке отсутствуют, то плагин получает новую загруженную из файла конфигурацию через callback-метод
onProcessConfig.Пример фрагмента лога:
INFO r.s.s.a.p.c.config.reload.PluginConfigurationReloadCallback - Reloading PRODUCER_RATE_LIMITER_PLUGIN
INFO r.s.ss.artemis.rate_limiter.plugin.ProducerRateLimiterPlugin - Configuration: PRODUCER_RATE_LIMITER_PLUGIN_CONFIG:{
"excludeAddresses" : [],
"delay" : 2000,
"logMsgCreationPeriod" : 5000,
"mode" : "off",
"producerRates" : [],
"threadPoolSize" : 8
}
processed successfully: PluginSettings[onMode:false, rateSettings:List(), excludeAddresses:Set(), logMsgCreationPeriod:5000, threadPoolSize:8]