MQTT протокол#
Описание#
MQTT — это легковесный клиент-серверный протокол, предназначенный для обмена сообщениями по модели публикация-подписка. MQTT был специально разработан для сокращения транспортных накладных расходов (и, следовательно, сетевого трафика) и запрогораммированного футпринта на клиентских устройствах. По этой причине MQTT идеально подходит для ограниченных устройств, таких как датчики и приводы, и он быстро стал стандартным протоколом коммуникации defacto для IoT.
Apache Activemq Artemis поддерживает следующие версии MQTT (со ссылками на их соответствующие спецификации):
По умолчанию в ActiveMQ Artemis есть акцепторы, настроенные для принятия соединений MQTT на портах 61616 и 1883.
Полное использование данного протокола приведено в опенсорсной документации Apache Activemq Artemis.
Общие сведения об MQTT#
Что нужно знать о MQTT?#
MQTT не использует адреса и очереди, mqtt использует топики.
топик (Topic) - эндпоинт на брокере, в который пишут продюсеры и из которого читают консюмеры;
топики можно огранизовывать в иерархии типа sberbank/Controller/<serial_number>/In/Value с помощью /, это пригодится в wildcard подписках;
клиенты (продюсеры и консюмеры) отправляют сообщения в топики. Топики именуются с помощью знака «/», например
mqtt/test/topic. Брокеры SMB(Apache ActiveMQ Artemis) работают в парадигме адресов и очередей. При этом имя топика, в который пишет продюсер преобразуется в multicast адрес, где символ «/» заменяется на «.», напримерmqtt.test.topic.
MQTT-консюмеры не читают очереди, они подписываются на топики (Subscription).
консюмер при подписке к топику («имя топика») создает в адресе («имя адреса») очередь с названием «<clientID для консюмера>.<имя адреса>», например
consumer1.mqtt.test.topic
MQTT использует концепцию publish/subscribe:
топики/подписки не создаются до подключения клиентов;
несколько консюмеров могут подписаться на один и тот же топик и получать одни и те же сообщения;
консюмеры получают только те сообщения, которые были отправлены в топик после их подписки. Если в топике нет консюмеров - сообщения теряются.
Настройка client.id в MQTT важна:
на ее основе создаются очереди для клиентов;
несколько клиентов с одинаковым client.id не могут работать одновременно, новый клиент отключает старый.
MQTT имеет несколько разных версий, используются две версии: 3.1.1 и 5.
Дополнительные возможности для MQTT 5:
появилась простая форма управления потоком (flow control);
введено использование механизма числовых псевдонимов топиков (topic aliasing);
поддержка shared-подписок;
введен параметр «максимальный размер пакета» (maximum packet size);
MQTT 5 и MQTT 3 используется различная конфигурация очистки сессий (CleanSession) Добавлена возможность использовать заголовки для MQTT-сообщений (MQTT 3 - заголовков у сообщений нет)
MQTT поддерживает только байтовые сообщения Настройки форматов существуют, но Apache ActiveMQ Artemis их не использует:
payloadFormatIndicator (UTF-8, UNCPECIFIED) - главный кандидат на байтовое/текстовое сообщение, по спеке должен валидировать формат на брокере;
content-type - заголовок с mime-типом, по спеке игнорируется брокером.
MQTT не поддерживает large messages, они конвертируются в regular.
MQTT может создавать дополнительные системные очереди:
$sys.mqtt.sessions - хранит данные persistent-сессий
$sys.mqtt.retain.<имя_топика> - retain-сообщения для топиков
$sys.mqtt.queue.qos2.<имя_топика> - управляющие очереди для
exactly once(QoS 2) семантики доставки сообщений
Виды подписок#
Обычная подписка
Основной сценарий, в котором продюсер отправляет сообщения в топик mqtt/test/topic, один или несколько консюмеров подписываются на этот же топик.
Для отправки сообщений на другой брокер должен использоваться механизм балансировки через адрес.

WildCard подписка
Вид подписки, при котором консюмер может подписаться сразу на несколько топиков с помощью wildcard синтаксиса, например:
mqtt/test/+;
mqtt/+/topic;
mqtt/#.
Несколько продюсеров отправляют сообщения в разные топики, несколько консюмеров подписываются сразу на все топики. Для отправки сообщений на другой брокер должен использоваться механизм балансировки через адрес.

В UI символ «*» будет отображаться с символом «" перед ним в качестве экрана.
Shared подписка
Одна подписка на несколько топиков делится между несколькими клиентами в группе, при этом сообщения из топика распределяются между клиентами алгоритмом round-robin(или иным внутренним алгоритмом Apache ActiveMQ Artemis). Сообщения должны распределяться между консюмерами в одной группе и дублироваться для разных групп. Для отправки сообщений на другой брокер должен использоваться механизм балансировки через адрес. Актуально для версии MQTT 3.1.1 в Apache ActiveMQ Artemis.

Shared-WildCard подписка
Одна подписка на несколько топиков делится между несколькими клиентами в группе, при этом сообщения из топика распределяются между клиентами алгоритмом round-robin.

Дополнительное пояснение к схеме:
Создается новый адрес mqtt..topic на каждом брокере в кластере, который соответствует mqtt/+/topic.
Создается новая очередь group1.mqtt..topic в новом адресе на каждом брокере.
При этом всем адресам (mqtt.test.topic, mqtt.other.topic, mqtt..topic) присваивается Queue Name = group1.mqtt..topic, а также RemoteQueueNames = group1.mqtt.*.topic
В UI символ «*» будет отображаться с символом «" перед ним в качестве экрана.
Одновременная обычная и shared подписки
Консюмер и группа консюмеров одновременно подписываются на один топик. Группы консюмеров и отдельные консюмеры должны получать дубликаты сообщений.

Дополнительные возможности MQTT#
QOS
QOS (Quality of service) настраивается на клиенте и отвечает за семантику доставки:
0 AT_MOST_ONCE - сообщение возможно будет доставлено один раз, без подтверждений об отправке/получении. При любых проблемах доставки сообщение теряется;
1 AT_LEAST_ONCE - сообщение будет гарантированно доставлено один раз или более, используются подтверджения об отправке/получении. При проблемах доставки сообщение будет переотправлено, что может породить дубли сообщения;
2 EXACTLY_ONCE сообщений будет гарантированно доставлено ровно один раз, без дублей. Для этого режима дополнительно используются системные очереди $sys.mqtt.queue.qos2.<имя_топика>.
Flow Control
Ограничение кол-ва сообщений при одновременной асинхронной отправке, только для MQTT 5.
брокер может сообщить клиенту, сколько сообщений QoS 1 и QoS 2 он может получить до подтверждения, и наоборот;
эта настройка регулируется на брокере путем установки URL-параметра receiveMaximum для MQTT-акцептора в broker.xml;
значение по умолчанию равно 65 535 (максимальное значение 2-байтового целого числа, используемого MQTT);
значение, равное 0, запрещено спецификацией MQTT 5;
значение -1 не позволит брокеру информировать клиента о максимальном количестве получаемых данных, что означает, что управление потоком данных от клиентов к брокеру будет отключено. По сути, это то же самое, что установить значение 65 535, но уменьшает размер пакета CONNACK на несколько байт.
Retain messages
Отдельный вид сообщений, которые сохраняются в топике и отправляются в первую очередь каждому новому подключенному консюмеру.
для хранения используются служебные retain-очереди $sys.mqtt.retain.<имя_топика>;
retain-сообщения хранятся пока клиент явно их не удалит или не истечёт (expires) время их жизни;
за автоудаление пустых служебных retain-очередей отвечает настройка из address-settings:
<address-setting match="$sys.mqtt.retain.#">
<auto-delete-queues>true</auto-delete-queues>
<auto-delete-addresses>true</auto-delete-addresses>
</address-setting>
Will messages
Отдельный вид сообщений, который может отправляться на брокер в момент подключения клиента.
вместе с сообщением указываются топики, в которые отправляется сообщение;
сообщение отправляется на указанные топики в момент отключения консюмера, который отправил will-сообщение.
Параметры брокера для работы с MQTT#
Параметры |
Местонахождение |
Значение по умолчанию |
Описание |
|---|---|---|---|
mqtt-session-scan-interval |
mqtt-session-scan-interval |
mqtt-session-scan-interval |
Определяет интервал сканирования сессий MQTT на сервере. Он показывает как часто сервер будет проверять состояние сессий и удалять устаревшие сессии. Это может быть полезно для управления ресурсами сервера и предотвращения утечки памяти |
mqtt-session-state-persistence-timeout |
broker.xml |
5000 мс |
Время ожидания брокером окончания записи на диск перед возбуждением исключения (throwing an error). Настройка определяет время, в течение которого состояние сессии MQTT сохраняется на сервере. Если клиент отключается от сервера, то сервер сохраняет состояние сессии на заданное время, чтобы клиент мог восстановить сессию при повторном подключении. Если время истекает, то состояние сессии удаляется и клиент должен начать новую сессию. Эта настройка может быть полезна для обеспечения надежности передачи сообщений и сохранения состояния сессии при временных отключениях клиента |
defaultMqttSessionExpiryInterval |
MQTT-акцептор |
5000 мс |
По умолчанию может быть настроен на MQTT-акцепторе таким образом, что заброшенные сеансы и очереди подписки будут автоматически очищаться по истечении интервала истечения срока действия |
serverKeepAlive |
MQTT-акцептор |
60 сек |
Определяет максимальное время, в течение которого клиент может быть отключен от сервера, прежде чем сервер закроет сессию. Это может быть полезно для обеспечения надежности передачи сообщений и предотвращения утечки ресурсов сервера; значение, равное 0, полностью отключает функцию keep alives, независимо от того, установлено ли для клиента значение keep alive. Делать подобное не рекомендуется, поскольку отключение функции keep alives обычно считается опасным, так как может привести к исчерпанию ресурсов; значение -1 означает, что брокер всегда будет принимать значение keep alive для клиента (даже если это значение равно 0); любое другое значение означает, что serverKeepAlive будет применено, если оно меньше значения keep alive для клиента (за исключением, когда клиентский keep alive равен 0 и будет приоритетно применено значение serverKeepAlive, чтобы предотвратить небезопасное отключение функции keep alives) |
allowLinkStealing |
MQTT-акцептор |
true |
Когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, сеанс уже существующего клиента будет закрыт. По умолчанию allowLinkStealing имеет значение true. Если для этого параметра установлено значение false, то всякий раз, когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, будет закрыт сеанс нового клиента |
receiveMaximum |
MQTT-акцептор |
65 535 |
Настройка flow control. Брокер может сообщить клиенту, сколько сообщений QoS 1 и QoS 2 он может получить до подтверждения, и наоборот. Значение, равное 0, запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о максимальном количестве получаемых данных, что означает, что управление потоком данных от клиентов к брокеру будет отключено. По сути, это то же самое, что установить значение 65 535, но уменьшает размер пакета CONNACK на несколько байт. |
topicAliasMaximum |
MQTT-акцептор |
65 535 |
В MQTT 5 введено использование механизма числовых псевдонимов топиков (topic aliasing). Служит для оптимизация размера пакетов управления публикацией, поскольку теперь вместо строкового имени топика, которое потенциально может быть довольно длинным, можно использовать 2-байтовое целое значение; обе стороны, и клиент, и брокер могут сообщать друг другу о максимальном значении псевдонима, которое они поддерживают (т.е. сколько разных псевдонимов-чисел может быть использовано); значение по умолчанию равно 65 535 (максимальное значение 2-байтового целого числа, используемого MQTT); значение, равное 0, отключит передачу псевдонимов топиков от клиентов брокеру; значение, равное -1, не позволит брокеру информировать клиента о максимальном количестве псевдонимов топиков, что означает, что передача псевдонимов топиков от клиентов брокеру будет отключена. Это фактически то же самое, что установить значение равным 0, но уменьшает размер пакета CONNACK на несколько байт. |
maximumPacketSize |
MQTT-акцептор |
268 435 455 |
В MQTT 5 введен максимальный размер пакета. Это максимальный размер пакета, который сервер или клиент готов принять. Значение 0 запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о каком-либо максимальном размере пакета, что означает, что размер входящих пакетов не будет ограничен, что также уменьшает размер пакета CONNACK на несколько байт. |
closeMqttConnectionOnPublishAuthorizationFailure |
MQTT-акцептор |
true |
Спецификация MQTT 3.1.1 неоднозначна в отношении поведения брокера при сбое публикации пакета из-за отсутствия авторизации (можно или закрыть соединение или дать положительное подтверждение). По умолчанию брокер будет закрывать соединение. Чтобы изменить это поведение на выдачу брокером положительного подтверждения нужно использовать URL-параметр конфигурации |
Тонкости работы с MQTT#
Для добавления роли пригодной для MQTT в конфигурационном файле vars.yml в блоке artemis_roles нужно задать параметры:
- user: CN=myname, OU=00CA, O=SBRF, L=Moscow, ST=Moscow, C=RU
topic: this/is/#
type: consume,browse,send
, где
user- DN пользователя;topic- имя топика;type- тип операции.
В зависимости от комбинации типов в security-settings.xml будут добавлены разрешения:
<security-setting match="this.is.#">
<permission type="createNonDurableQueue" roles="consume_this_is_#"/>
<permission type="deleteNonDurableQueue" roles="consume_this_is_#"/>
<permission type="createDurableQueue" roles="consume_this_is_#"/>
<permission type="deleteDurableQueue" roles="consume_this_is_#"/>
<permission type="createAddress" roles="send_this_is_#,consume_this_is_#"/>
<permission type="deleteAddress" roles="consume_this_is_#"/>
<permission type="consume" roles="consume_this_is_#"/>
<permission type="browse" roles="browse_this_is_#"/>
<permission type="send" roles="send_this_is_#"/>
</security-setting>
Для корректной работы MQTT протокола для адресов в конфигурационном файле vars.yml в блоке artemis_addresses необходимо указать настройки:
artemis_addresses:
defaults: # дефолтные значения конфигураций для адресов
default_address_routing_type: 'MULTICAST' # routing_type:MULTICAST - тип маршрутизации, используемый для автоматически созданных адресов
auto_create_queues: 'true' # bool:true - создавать ли автоматически очередь, когда клиент отправляет сообщение или пытается потребить сообщение из очереди
auto_create_addresses: 'true' # bool:true - создавать ли автоматически адреса, когда клиент отправляет сообщение или пытается потребить сообщение из очереди, сопоставленной с адресом, который не существует
auto_delete_queues: 'true' # bool:true - удалять ли автоматически созданные очереди, когда очередь имеет 0 потребителей и 0 сообщений
auto_delete_addresses: 'true' # bool:true - удалять ли автоматически созданные адреса, когда они больше не имеют очередей