MQTT-клиенты#
MQTT-протокол (Message Queuing Telemetry Transport) - упрощённый сетевой протокол обмена сообщениями, ориентированный на обмен сообщениями между устройствами в парадигме «издатель-подписчик» (pub/sub). Протокол предназначен для небольших устройств и устройств с ограниченной пропускной способностью (IoT). Используется для передачи информации между удалёнными локациями в случае ограниченной пропускной способности канала и небольшого размера кода. MQTT работает поверх TCP/IP, но также может использоваться с WebSocket. Apache ActiveMQ Artemis обеспечивает поддержку MQTT версий 3.1, 3.1.1, 5.0. Рекомендуется использовать версию протокола 5.0. В этой версии реализована поддержки заголовков сообщений (user properties) и ряд прочих улучшений.
Предусловия использования#
Перед настройкой Mqtt-клиента должны быть выполнены следующие пункты:
запуск брокера Artemis (описано подробнее в документе «Руководство по установке», раздел Установка);
создание адреса и очереди для записи и вычитки сообщений (описано подробнее в документе «Руководство по системному администрированию», раздел Создание адресов и очередей на брокере, выдача прав на них).
Подключение зависимостей#
Документация Apache ActiveMQ Artemis рекомендует библиотеку org.fusesource.mqtt-client, но она не поддерживает версию протокола 5.0, поэтому была использована библиотека от HiveMQ. В этой библиотеке представлен широкий функционал, например: блокирующего, асинхронного и реактивного клиентов. Ознакомиться с полной функциональностью можно на сайте официальной документации.
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>${mqtt.client.version}</version>
</dependency>
Пример создания и настройки простого клиента для отправки и чтения текстового сообщения#
Пример mqtt_1:
// Предполагается, что на брокере может быть создан топик `test/topic` (в терминологии Artemis: multicast-адрес `test.topic` с автосозданием очередей)
public static String TOPIC = "test/topic";
// 1. Создание MQTT-клиента для подключения к брокеру
var clientBuilder = MqttClient.builder().serverHost("0.0.0.0").serverPort(61616); // for plain TCP local
var client = clientBuilder
.useMqttVersion5() // версия протокола MQTT
.identifier("mqtt_test_app_id") // уникальный id клиента (одновременно к брокеру может быть подключен только один клиент с одинаковым id)
.buildBlocking(); // создание блокирующего клиента (можно использовать не блокирующий, но тогда нужно писать в цикле чтение)
client.connect(); // подключение к брокеру
// 2. Создание публикации (сообщения) для отправки в топик на брокере
var msgForPublish = Mqtt5Publish.builder()
.topic(TOPIC)
.payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
.qos(MqttQos.AT_LEAST_ONCE)
.build();
// 3. Создание подписки на топик
client.subscribeWith() // оформление подписки на топик
.topicFilter(TOPIC) // указываем топик (поддерживает wildcard)
.qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
.send(); // отправляем на брокер
var publishes = client.publishes(MqttGlobalPublishFilter.SUBSCRIBED); // настраиваем фильтр входящих сообщений из подписки
// 4. Отправка публикации в топик (работа клиента в режиме продюсера Artemis)
// важно оформить подписку на топик (создать консьюмера в п.3) до начала отправки сообщений продюсером иначе сообщение не будет доставлено
client.publish(msgForPublish); // отправка сообщения в топик на брокере
// 5. Вычитываем сообщение из топика подписки брокера (работа клиента в режиме консьюмера Artemis)
publishes.receive(5, TimeUnit.SECONDS)
.ifPresent(msg -> log.info("Получено сообщение: '{}'", new String(msg.getPayloadAsBytes(), Charset.defaultCharset())));
// 6. Отключение клиента от брокера и закрытие ресурсов после окончания работы с брокером
client.disconnect();
Описание интерфейсов основных сущностей клиента для MQTT-протокола#
Клиентские интерфейсы, реализованные в библиотеке HiveMQ для разных версий MQTT-протокола существенно различаются, при этом номер версии протокола добавляется префиксом к имени интерфейса, например, Mqtt3Client или Mqtt5Client.Далее рассматриваются примеры использования основных интерфейсов только для Mqtt версии 5.0.
Основные интерфейсы клиента для MQTT-протокола#
Интерфейс |
Краткое описание |
|---|---|
представляет сущность «Клиент» - инкапсулирует действия по работе в режимах публикации и подписки |
|
представляет сущность «Публикация» - отправляемое или получаемое сообщение |
|
представляет сущность «Подписка» |
|
настраивает клиентское подключение к брокеру |
|
настраивает параметры отключения клиента от брокера |
Интерфейсы обработчиков асинхронного варианта API#
Интерфейс |
Где используется |
Описание |
|---|---|---|
|
subscribe(), callback() |
callback для обработки входящих сообщений (публикаций), соответствующих оформленной подписке |
|
publishes() |
callback для обработки всех входящих сообщений (публикаций), соответствующих глобальному фильтру |
Mqtt5Client#
MQTT-клиенты используют объекты класса Mqtt5Client для создания клиентского соединения с сервером, передачи основных настроек и команд коммуникации.
Создание экземпляра клиента#
Для создания объекта класса Mqtt5Client можно воспользоваться отдельным классом-билдером MqttClientBuilder,
указав желаемую версию MQTT-протокола непосредственно перед созданием экземпляра клиента методом useMqttVersion5():
MqttClientBuilder сlientBuilder = MqttClient.builder()
.identifier("mqtt_test_app_id")
.serverHost("0.0.0.0")
.serverPort(61616);
Mqtt5Client client = сlientBuilder
.useMqttVersion5()
.build();
Можно сразу использовать билдер MQTT-клиента нужной версии (в нашем случае - v.5.0):
Mqtt5ClientBuilder сlientBuilder = Mqtt5Client.builder()
.identifier("mqtt_test_app_id")
.serverHost("0.0.0.0")
.serverPort(61616);
Mqtt5Client client = сlientBuilder.build();
Следует обратить внимание на важную роль идентификатора клиента, передаваемого в методе
identifier(). При подключении нескольких клиентов с одинаковым идентификатором для работы в режиме «Подписчика», брокер оставит активным только одного. Логика выбора определяется настройкой параметраallowLinkStealingMQTT-акцептора на брокере (см. Параметры конфигурации MQTT-протокола на стороне брокера).
Идентификатор клиента также поможет в дальнейшем во время отладки и мониторинга клиентского приложения.
Работу клиентского приложения теперь можно отслеживать в консоли управления брокера по значению Client ID = mqtt_test_app_id во вкладках Connections, Consumers, and Producers.
Для создания защищённого TLS-соединения нужно добавить параметры ssl-конфигурации, c использованием дополнительного билдера MqttClientSslConfigBuilder:
KeyManagerFactory keyManagerFactory = ...; // настраиваем предварительно KeyManagerFactory
TrustManagerFactory trustManagerFactory = ...; // настраиваем предварительно TrustManagerFactory
HostnameVerifier hostNameVerifier = ...; // настраиваем предварительно HostnameVerifier
Mqtt5ClientBuilder сlientBuilder = Mqtt5Client.builder()
.identifier("mqtt_test_app_id")
.serverHost("0.0.0.0")
.serverPort(61616);
.sslConfig() // создаём дополнительный билдер MqttClientSslConfigBuilder
.trustManagerFactory(trustManagerFactory)
.hostnameVerifier(hostNameVerifier)
.keyManagerFactory(keyManagerFactory)
.applySslConfig();
Варианты API#
Реализация MQTT-клиента в библиотеке HiveMQ предлагает 3 различных способа работы клиента с брокером и, соответственно, 3 варианта, используемого при этом, API:
синхронный стиль (
Blocking) - методы API напрямую возвращают результаты и блокируют текущий поток до тех пор, пока они не станут доступны;асинхронный стиль (
Asynchronous) - методы API возвращают;CompletableFutureвместо ожидания результата. Для потоков входящих сообщений (публикаций) используютсяcallback-функции.реактивный стиль (
Reactive) - методы API используютRxJava(Reactive StreamsиReactive Extensions).
Выбор используемого стиля работы (варианта API) клиента может осуществляться:
напрямую при создании экземпляра клиента билдером - в этом случае, вместо вызова метода
build(), следует использовать соответствующие методы:buildBlocking(),buildAsync()илиbuildRx(), например:
Mqtt5AsyncClient asyncClient = Mqtt5Client.builder().buildAsync();
путём переключения во время работы клиента - выполняется с помощью соответствующего метода:
toBlocking(),toAsync(),toRx(),
например:
Mqtt5Client client = ...; // может быть экземпляром Mqtt5BlockingClient, Mqtt5AsyncClient или Mqtt5RxClient
Mqtt5BlockingClient blockingClient = client.toBlocking();
Подключение клиента к брокеру#
После создания экземпляра MQTT-клиента с выбором требуемого режима работы, можно подключиться к брокеру методом connect():
blockingClient.connect();
Работа клиента с брокером#
Объект Mqtt5Client инкапсулирует внутри себя одновременно как работу в режиме Публикатора (продюсера сообщений на брокер Artemis)
так и работу в режиме Подписчика (консьюмера сообщений с брокера Artemis)
Работа в режиме Публикатора (pub)#
Работа клиента в pub-режиме (продюсера сообщений на брокер Artemis) может производиться двумя способами:
Формирование публикации «на лету» и его последующая передача методом
publishWith().Передача заранее подготовленной публикации (объекта
Mqtt5Publish) методомpublish().
Пример:
// 1. Формирование сообщения "на лету" и его отправка на брокер
blockingClient.publishWith()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("Тело тестового сообщения".getBytes())
.send();
// 2.1. Предварительное создание сообщения
Mqtt5Publish msgForPublish = Mqtt5Publish.builder()
.topic("test/topic")
.payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
.qos(MqttQos.AT_LEAST_ONCE)
.build();
// 2.2. Публикация сообщения
blockingClient.publish(msgForPublish);
Работа в режиме Подписчика (sub)#
Для работы в режиме Подписчика (консьюмера сообщений с брокера Artemis) необходимо:
Оформить подписку на топик одним из двух возможных способов:
1.1. «на лету» - методом
subscribeWith();1.2. с использованием заранее подготовленной подписки (объекта
Mqtt5Subscribe) - методомsubscribe().
Вычитать сообщение из топика:
2.1. в синхронном режиме - с помощью объекта
Mqtt5Publishes(накопитель входящих публикаций):2.1.1. методом
recieve()- с блокировкой на время чтения;2.1.2. методом
recieve()- с тайм-аутом ожидания;2.1.3. методом
recieveNow()- без блокировки на время чтения;
2.2. в асинхронном режиме - с помощью
callback-функции:2.2.1. в рамках оформленной подписки (методами
subscribeWith(),subscribe())2.2.2. в глобальном слушателе сообщений (методом
publishes())
Пример реализации синхронного Подписчика:
// 1.1. Подписка на сообщения топика "на лету"
blockingClient.subscribeWith()
.topicFilter("test/topic") // указываем топик
.qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
.send(); // отправляем команду на брокер
// 2.1. Создаём предварительно объект Mqtt5Publishes
// настраиваем фильтр входящих сообщений из подписки:
var publishes = blockingClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED);
// Вычитывание сообщения из топика подписки:
// 2.1.1. с блокировкой на время чтения
Mqtt5Publish publishMessage = publishes.receive();
// 2.1.2. с таймаутом ожидания
Optional<Mqtt5Publish> publishMessage = publishes.receive(10, TimeUnit.SECONDS);
// 2.1.3. без блокировки на время чтения
Optional<Mqtt5Publish> publishMessage = publishes.receiveNow();
Пример реализации Подписчика на асинхронном API:
// Обработчик полученных с брокера сообщений
Consumer<Mqtt5Publish> consumedMsgCallback = publishMsg -> System.out.println(new String(publishMsg.getPayloadAsBytes()));
// 2.2.2. Глобальный слушатель всех полученных клиентом сообщений
// Метод `publishes()` должен быть вызван перед методом `subscribe()`, чтобы гарантировать, что сообщение не будет потеряно.
// Этот метод можно вызвать перед подключением (connect) для получения сообщений за предыдущий сеанс.
asyncClient.publishes(MqttGlobalPublishFilter.ALL, consumedMsgCallback);
// 1.1. Подписка "на лету"
asyncClient.subscribeWith()
.topicFilter("test/topic") // указываем топик
.qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
// 2.2.1. Обработка каждого полученного сообщения из топика подписки асинхронным обработчиком
.callback(consumedMsgCallback)
.send(); // отправляем команду на брокер
// 1.2. Использование предварительно созданной подписки
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.qos(MqttQos.EXACTLY_ONCE)
.build();
// 2.2.1. Обработка каждого полученного сообщения из топика подписки асинхронным обработчиком
asyncClient.subscribe(subscribeMessage, consumedMsgCallback);
Отключение клиента от брокера#
После окончания работы с клиентом, производится отключение от брокера:
blockingClient.disconnect();
Mqtt5Publish#
MQTT-клиенты используют объекты класса Mqtt5Publish:
В режиме публикации (pub) - для создания сообщения перед отправкой на брокер.
В режиме подписки (sub) - так как сообщения поступают с брокера на клиент в виде экземпляров объектов данного класса.
Для создания экземпляра класса Mqtt5Publish перед отправкой может использоваться билдер Mqtt5PublishBuilder, например:
// Настраиваем предварительно фильтр входящих сообщений из подписки
var publishes = client.publishes(MqttGlobalPublishFilter.SUBSCRIBED);
// 1. Создание публикации (экземпляра класса `Mqtt5Publish`) перед отправкой в топик на брокере
var msgForPublish = Mqtt5Publish.builder()
.topic("test/topic")
.payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
.qos(MqttQos.AT_LEAST_ONCE)
.build();
// Отправка сообщения (экземпляра класса `Mqtt5Publish`) в топик на брокере (pub-режим)
client.publish(msgForPublish);
// 2. Получение публикации (экземпляра класса `Mqtt5Publish`) из топика подписки брокера (sub-режим)
Mqtt5Publish publishMessage = publishes.receive();
Сообщения публикуются с обязательным указанием топика.
Топик необходим брокеру для рассылки сообщений подписчикам и является единственным обязательным свойством объекта класса
Mqtt5Publish.
Остальные свойства не являются обязательными. В общем случае публикация может содержать следующие поля:
Свойство |
Назначение |
Тип |
getter |
|---|---|---|---|
topic |
имя целевого топика публикации |
String |
getTopic() |
payload |
полезная нагрузка (тело сообщения), представляется всегда в байтовом виде |
byte[] / ByteBuffer |
getPayloadAsBytes() / getPayload() |
qos |
уровень гарантии доставки |
MqttQos |
getQos() |
retain |
признак retain-сообщения |
boolean |
isRetain() |
messageExpiryInterval |
интервал истечения срока действия сообщения (в секундах) |
OptionalLong |
getMessageExpiryInterval() |
payloadFormatIndicator |
индикатор типа тела сообщения ( |
Mqtt5PayloadFormatIndicator |
getPayloadFormatIndicator() |
contentType |
тип содержимого тела сообщения (стандартный MIME-тип), игнорируется брокером |
String / MqttUtf8String |
contentType() |
userProperties |
заголовки сообщения (набор строковых пар «ключ-значение») |
Mqtt5UserProperties |
getUserProperties() |
Гарантия доставки сообщений#
Mqtt поддерживает все три соглашения о гарантии доставки (Quality of Service):
QoS 0(MqttQos.AT_MOST_ONCE) -at most once(максимум одно сообщение) - возможны потери сообщений в процессе доставки,QoS 1(MqttQos.AT_LEAST_ONCE)-at least once(минимум одно сообщение) - возможны дубликаты сообщений в процессе доставки,QoS 2(MqttQos.EXACTLY_ONCE) -exactly once(ровно одно сообщение) - доставка сообщений без потерь и дубликатов.
Retain Messages#
специальные retain-сообщения могут сохраняться для каждого топика, чтобы быть переданными в первую очередь каждому новому подписавшемуся на топик клиенту-консьюмеру;
retain-сообщения хранятся пока клиент явно их не удалит или не истечёт (expires) время их жизни.
Пользовательские заголовки сообщения#
Пользовательские заголовки (User Properties) - это определенные пользователем пары имен и значений, которые отправляются вместе с сообщением о публикации Mqtt.
Mqtt поддерживает строковый формат имен и значения в заголовках.
Добавить к публикации заголовки можно через специальный билдер несколькими способами, например:
// 2. Создание публикации (сообщения) для отправки в топик на брокере
// 2.1. Формирование заголовков на основе готового набора свойств (системных свойств ОС)
List<Mqtt5UserProperty> msgHeaders = System.getProperties().entrySet().stream()
.filter(entry -> entry.getKey().toString().startsWith("os"))
.map(entry -> Mqtt5UserProperty.of(entry.getKey().toString(), entry.getValue().toString()))
.collect(Collectors.toList());
// 2.2. Формирование заголовка
Mqtt5UserProperty header2 = Mqtt5UserProperty.of("header2", "value2");
// 2.3. Создание публикации с помощью билдера
var msgForPublish = Mqtt5Publish.builder()
.topic("test/topic")
.payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
.qos(MqttQos.AT_LEAST_ONCE)
.userProperties() // использование билдера заголовков
.add("header1", "value1") // непосредственное добавление пары строк "ключ-значение"
.add(header2) // добавление заголовка как сущности `Mqtt5UserProperty`
.addAll(msgHeaders) // добавление коллекции заголовков
.applyUserProperties()
.build();
Mqtt5Subscription#
MQTT-клиенты используют объекты класса Mqtt5Subscription для настройки параметров будущей подписки.
При оформлении подписки через объект класса Mqtt5Subscribe, экземпляры класса Mqtt5Subscription могут добавляться к подписке специальным методом addSubscription().
Соответственно, список привязанных к объекту Mqtt5Subscribe подписок (экземпляров Mqtt5Subscription) можно получить методом getSubscriptions().
Например:
// Оформление подписки
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
.topicFilter("test/topic")
.addSubscription() // добавление дополнительной подписки с помощью билдера
.topicFilter("test/global") // параметр подписки
.qos(MqttQos.EXACTLY_ONCE) // параметр подписки
.noLocal(true) // параметр подписки
.applySubscription() // применение подписки
.build();
// Получение списка привязанных подписок из сообщения о подписке
var subscriptions = subscribeMessage.getSubscriptions();
System.out.println("Subscriptions: " + subscriptions);
Подписка формируется с обязательным указанием фильтра топика.
Фильтр топика позволяет брокеру рассылать подписчикам сообщения сразу из нескольких топиков
В фильтре можно заменять части имени топика wildcard-символами. В MQTT спецификации при указании имени группы топиков для подписки используются wildcard-символы:
/- разделитель частей составного имени;#- шаблон для произвольной последовательности частей (слов) в имени;+- шаблон для одной части (слова) в имени.
В зависимости от содержимого фильтра топика различают следующие виды MQTT-подписок:
обычная подписка - фильтр не содержит wildcard-символов и представляет полное имя топика;
wildcard-подписка - фильтр содержит wildcard-символы, что позволяет ему подписаться сразу на несколько топиков;
shared-подписка - задаётся в специальном формате:
$share/<shareName>/<topicFilter>, с указанием имени группы (<shareName>) и фильтра топика (<topicFilter>).
В MQTT v.5 можно оформлять общую подписку (
shared subscriptions) сразу для группы подписчиков (subscription group). Сообщения из топика будут распределяться по подписчикам из одной группы по очереди (round-robinалгоритмом). Общие подписки могут быть полезны для балансировки нагрузки и обеспечения отказоустойчивости.
В общем случае подписка может содержать следующие поля:
Свойство |
Назначение |
Тип |
getter |
|---|---|---|---|
topicFilter |
фильтр имени топика публикации (могут использоваться wildcard-символы: #, +) |
String или MqttTopicFilter |
getTopicFilter() |
qos |
уровень гарантии доставки |
MqttQos |
getQos() |
retainHandling |
вид обработки retain-сообщений для этой подписки ( |
Mqtt5RetainHandling |
getRetainHandling() |
retainAsPublished |
должен ли retain-флаг для входящих сообщений о публикации быть установлен в исходное значение |
boolean |
isRetainAsPublished() |
noLocal |
флаг, показывающий, что сообщения из подписки будут получены только если отправлены другими клиентами, а не себе самому |
boolean |
isNoLocal() |
Режимы коммитов#
Для обеспечения надежности доставки сообщений между клиентами и брокерами, протокол MQTT использует подтверждение успешной вычитки сообщений (Message Acknowledgement).
Когда в топике публикуется сообщение с уровнем QoS 1 или QoS 2, отправитель ожидает подтверждения от получателя о том, что сообщение получено и успешно обработано.
По умолчанию это автоматическая операция, которая выполняется в фоновом режиме.
В качестве альтернативы, Java-клиент MQTT HiveMQ поддерживает подтверждение полученных сообщений в ручном режиме (Manual Message Acknowledgement).
В этом случае получатель (subscriber) явно подтверждает сообщение после его обработки, предоставляя отправителю гарантии успешной вычитки и обработки публикаций.
Например:
asyncClient.subscribeWith()
.topicFilter("test/topic")
.manualAcknowledgement(true) // включить подтверждение полученных сообщений в ручном режиме для данной подписки
.callback(publish -> { // обработчик сообщений подписки, поступающих с брокера
boolean success = false;
// Некая логика обработки входящего сообщения
if (success) {
publish.acknowledge(); // Обусловленное некоторой логикой обработки подтверждение получения сообщения
}
})
.send()
.join();
Mqtt5Connect#
Экземпляр Mqtt5Connect класса используется для настройки подключения клиента к брокеру.
Можно подключить клиент MQTT без необходимости указывать аргументы.
При этом будут использоваться параметры по умолчанию, определенные в спецификации MQTT, или разумные значения по умолчанию, если они там не определены.
Например:
// Подключение с параметрами по умолчанию
client.connect();
// Подключение с передачей пользовательских значений параметров
client.connectWith().keepAlive(10).send();
// Предварительное создание объекта Mqtt5Connect с помощью билдера
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(10).build();
// Подключение с передачей объекта Mqtt5Connect
client.connect(connectMessage);
Параметры клиентского подключения#
Допустимы следующие параметры подключения:
Clean Start- определяет, хочет ли клиент начать новый “чистый” сеанс (true) или хочет возобновить предыдущий сеанс, если таковой имеется (false);Session Expiry Interval- интервал времени (в секундах), в течение которого сеанс будет продолжаться после отключения клиента;Keep Alive- интервал времени (в секундах), в течение которого клиент отправляет ping-запрос брокеру, если в течение этого периода времени не было отправлено никаких других пакетов MQTT. Используется для определения того, работает ли соединение по-прежнему;Will- это сообщение («завещание») публикуется брокером автоматически, если клиент отключился по неосторожности или с кодом причиныDISCONNECT_WITH_WILL_MESSAGE;Restrictions- можно указывать ограничения как на стороне брокера, так и на стороне клиента. Ограничения для сообщений, полученных от брокера, отправляются вместе с сообщением Mqtt 5 Connect. Другие параметры для сообщений, которые клиент отправляет сам, используются в сочетании с ограничениями, указанными брокером в сообщении Mqtt5ConnAck, для определения фактических ограничений на стороне клиента;User Properties- набор пользовательских заголовков, передаваемых вместе с сообщением о подключении.
Параметры конфигурации MQTT-протокола на стороне брокера#
Параметр |
Место |
Значение по умолчанию |
Описание |
|---|---|---|---|
|
broker.xml |
5000 мс |
определяет интервал сканирования сессий MQTT на сервере. Он указывает, как часто сервер будет проверять состояние сессий и удалять устаревшие сессии. Это может быть полезно для управления ресурсами сервера и предотвращения утечки памяти |
|
broker.xml |
5000 мс |
время ожидания брокером окончания записи на диск перед возбуждением исключения ( |
|
MQTT-акцептор |
5000 мс |
по умолчанию может быть настроен на MQTT-акцепторе таким образом, что заброшенные сеансы и очереди подписки будут автоматически очищаться по истечении интервала истечения срока действия |
|
MQTT-акцептор |
60 сек |
определяет максимальное время, в течение которого клиент может быть отключен от сервера, прежде чем сервер закроет сессию. Это может быть полезно для обеспечения надежности передачи сообщений и предотвращения утечки ресурсов сервера; значение, равное |
|
MQTT-акцептор |
true |
когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, сеанс существующего клиента будет закрыт. По умолчанию allowLinkStealing имеет значение true. Если для этого параметра установлено значение false, то всякий раз, когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, сеанс нового клиента будет закрыт |
|
MQTT-акцептор |
65 535 |
настройка flow control. Брокер может сообщить клиенту, сколько сообщений QoS 1 и QoS 2 он может получить до подтверждения, и наоборот. значение, равное 0, запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о максимальном количестве получаемых данных, что означает, что управление потоком данных от клиентов к брокеру будет отключено. По сути, это то же самое, что установить значение 65 535, но уменьшает размер пакета CONNACK на несколько байт. |
|
MQTT-акцептор |
65 535 |
в MQTT 5 введено использование механизма числовых псевдонимов топиков (topic aliasing). Служит для оптимизация размера пакетов управления публикацией, поскольку теперь вместо строкового имени топика, которое потенциально может быть довольно длинным, можно использовать 2-байтовое целое значение; обе стороны, и клиент, и брокер могут сообщать друг другу о максимальном значении псевдонима, которое они поддерживают (т.е. сколько разных псевдонимов-чисел может быть использовано); значение по умолчанию равно 65 535 (максимальное значение 2-байтового целого числа, используемого MQTT); значение, равное 0, отключит передачу псевдонимов топиков от клиентов брокеру; значение, равное -1, не позволит брокеру информировать клиента о максимальном количестве псевдонимов топиков, что означает, что передача псевдонимов топиков от клиентов брокеру будет отключена. Это фактически то же самое, что установить значение равным 0, но уменьшает размер пакета CONNACK на несколько байт. |
|
MQTT-акцептор |
268 435 455 |
в MQTT 5 введен максимальный размер пакета. Это максимальный размер пакета, который сервер или клиент готов принять; значение 0 запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о каком-либо максимальном размере пакета, что означает, что размер входящих пакетов не будет ограничен, что также уменьшает размер пакета CONNACK на несколько байт. |
|
MQTT-акцептор |
true |
Спецификация MQTT 3.1.1 неоднозначна в отношении поведения брокера при сбое публикации пакета из-за отсутствия авторизации (можно или закрыть соединение или дать положительное подтверждение); по дефолту брокер будет закрывать соединение; чтобы изменить это поведение на выдачу брокером положительного подтверждения нужно использовать URL-параметр конфигурации |
Mqtt5Disconnect#
Экземпляр Mqtt5Disconnect класса используется для отключения клиента от брокера.
Можно отключить клиент MQTT без необходимости указывать дополнительные аргументы. При этом будут использоваться параметры по умолчанию, определенные в спецификации MQTT, или разумные значения по умолчанию, если они там не определены.
Примеры:
// Отключение с параметрами по умолчанию
client.disconnect();
// Отключение с передачей пользовательских значений параметров
client.disconnectWith().reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE).send();
// Предварительное создание объекта Mqtt5Disconnect с помощью билдера
Mqtt5Disconnect disconnectMessage = Mqtt5Disconnect.builder().sessionExpiryInterval(100).build();
// Отключение с передачей объекта Mqtt5Disconnect
client.disconnect(disconnectMessage);
Параметры клиентского отключения#
Допустимы следующие параметры отключения:
Reason code- код возможной причины отключения (Mqtt5DisconnectReasonCode);Session Expiry Interval- интервал времени (в секундах), в течение которого сеанс будет продолжаться после отключения клиента;User Properties- набор пользовательских заголовков, передаваемых вместе с сообщением об отключении.
Примеры логирования#
Штатные логи классов клиента MQTT-протокола не формируются.
Необходимо полагаться на логирование в клиентском приложении.
Например, INFO-логи примера mqtt_1 выглядят следующим образом:
[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO - Блокирующий MQTT-клиент с отправкой и вычиткой сообщения (mqtt_1)
[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO - Отправлено сообщение: тело='Тело тестового сообщения', заголовки=[(header1, value1), (header2, value2), (os.name, Mac OS X), (os.version, 14.6), (os.arch, x86_64)]
[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO - Получено сообщение: тело='Тело тестового сообщения', заголовки=[(header1, value1), (header2, value2), (os.name, Mac OS X), (os.version, 14.6), (os.arch, x86_64)]