MQTT-клиенты#

MQTT-протокол (Message Queuing Telemetry Transport) - упрощённый сетевой протокол обмена сообщениями, ориентированный на обмен сообщениями между устройствами в парадигме «издатель-подписчик» (pub/sub). Протокол предназначен для небольших устройств и устройств с ограниченной пропускной способностью (IoT). Используется для передачи информации между удалёнными локациями в случае ограниченной пропускной способности канала и небольшого размера кода. MQTT работает поверх TCP/IP, но также может использоваться с WebSocket. Apache ActiveMQ Artemis обеспечивает поддержку MQTT версий 3.1, 3.1.1, 5.0. Рекомендуется использовать версию протокола 5.0. В этой версии реализована поддержки заголовков сообщений (user properties) и ряд прочих улучшений.

Предусловия использования#

Перед настройкой Mqtt-клиента должны быть выполнены следующие пункты:

Подключение зависимостей#

Документация Apache ActiveMQ Artemis рекомендует библиотеку org.fusesource.mqtt-client, но она не поддерживает версию протокола 5.0, поэтому была использована библиотека от HiveMQ. В этой библиотеке представлен широкий функционал, например: блокирующего, асинхронного и реактивного клиентов. Ознакомиться с полной функциональностью можно на сайте официальной документации.

<dependency>
    <groupId>com.hivemq</groupId>
    <artifactId>hivemq-mqtt-client</artifactId>
    <version>${mqtt.client.version}</version>
</dependency>

Пример создания и настройки простого клиента для отправки и чтения текстового сообщения#

Пример mqtt_1:

// Предполагается, что на брокере может быть создан топик `test/topic` (в терминологии Artemis: multicast-адрес `test.topic` с автосозданием очередей)
public static String TOPIC = "test/topic";

// 1. Создание MQTT-клиента для подключения к брокеру
var clientBuilder = MqttClient.builder().serverHost("0.0.0.0").serverPort(61616); // for plain TCP local
var client = clientBuilder
        .useMqttVersion5() // версия протокола MQTT
        .identifier("mqtt_test_app_id") // уникальный id клиента (одновременно к брокеру может быть подключен только один клиент с одинаковым id)
        .buildBlocking(); // создание блокирующего клиента (можно использовать не блокирующий, но тогда нужно писать в цикле чтение)
client.connect(); // подключение к брокеру

// 2. Создание публикации (сообщения) для отправки в топик на брокере
var msgForPublish = Mqtt5Publish.builder()
        .topic(TOPIC)
        .payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
        .qos(MqttQos.AT_LEAST_ONCE)
        .build();

// 3. Создание подписки на топик
client.subscribeWith() // оформление подписки на топик
        .topicFilter(TOPIC) // указываем топик (поддерживает wildcard)
        .qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
        .send(); // отправляем на брокер
var publishes = client.publishes(MqttGlobalPublishFilter.SUBSCRIBED); // настраиваем фильтр входящих сообщений из подписки

// 4. Отправка публикации в топик (работа клиента в режиме продюсера Artemis)
// важно оформить подписку на топик (создать консьюмера в п.3) до начала отправки сообщений продюсером иначе сообщение не будет доставлено
client.publish(msgForPublish); // отправка сообщения в топик на брокере

// 5. Вычитываем сообщение из топика подписки брокера (работа клиента в режиме консьюмера Artemis)
publishes.receive(5, TimeUnit.SECONDS)
        .ifPresent(msg -> log.info("Получено сообщение: '{}'", new String(msg.getPayloadAsBytes(), Charset.defaultCharset())));

// 6. Отключение клиента от брокера и закрытие ресурсов после окончания работы с брокером
client.disconnect();

Описание интерфейсов основных сущностей клиента для MQTT-протокола#

Клиентские интерфейсы, реализованные в библиотеке HiveMQ для разных версий MQTT-протокола существенно различаются, при этом номер версии протокола добавляется префиксом к имени интерфейса, например, Mqtt3Client или Mqtt5Client.Далее рассматриваются примеры использования основных интерфейсов только для Mqtt версии 5.0.

Основные интерфейсы клиента для MQTT-протокола#

Интерфейс

Краткое описание

Mqtt5Client

представляет сущность «Клиент» - инкапсулирует действия по работе в режимах публикации и подписки

Mqtt5Publish

представляет сущность «Публикация» - отправляемое или получаемое сообщение

Mqtt5Subscription

представляет сущность «Подписка»

Mqtt5Connect

настраивает клиентское подключение к брокеру

Mqtt5Disconnect

настраивает параметры отключения клиента от брокера

Интерфейсы обработчиков асинхронного варианта API#

Интерфейс

Где используется

Описание

Consumer<Mqtt5Publish>

subscribe(), callback()

callback для обработки входящих сообщений (публикаций), соответствующих оформленной подписке

Consumer<Mqtt5Publish>

publishes()

callback для обработки всех входящих сообщений (публикаций), соответствующих глобальному фильтру

Mqtt5Client#

MQTT-клиенты используют объекты класса Mqtt5Client для создания клиентского соединения с сервером, передачи основных настроек и команд коммуникации.

Создание экземпляра клиента#

Для создания объекта класса Mqtt5Client можно воспользоваться отдельным классом-билдером MqttClientBuilder, указав желаемую версию MQTT-протокола непосредственно перед созданием экземпляра клиента методом useMqttVersion5():

MqttClientBuilder сlientBuilder = MqttClient.builder()
        .identifier("mqtt_test_app_id")
        .serverHost("0.0.0.0")
        .serverPort(61616);

Mqtt5Client client = сlientBuilder
        .useMqttVersion5()
        .build();

Можно сразу использовать билдер MQTT-клиента нужной версии (в нашем случае - v.5.0):

Mqtt5ClientBuilder сlientBuilder = Mqtt5Client.builder()
        .identifier("mqtt_test_app_id")
        .serverHost("0.0.0.0")
        .serverPort(61616);

Mqtt5Client client = сlientBuilder.build();

Следует обратить внимание на важную роль идентификатора клиента, передаваемого в методе identifier(). При подключении нескольких клиентов с одинаковым идентификатором для работы в режиме «Подписчика», брокер оставит активным только одного. Логика выбора определяется настройкой параметра allowLinkStealing MQTT-акцептора на брокере (см. Параметры конфигурации MQTT-протокола на стороне брокера).

Идентификатор клиента также поможет в дальнейшем во время отладки и мониторинга клиентского приложения. Работу клиентского приложения теперь можно отслеживать в консоли управления брокера по значению Client ID = mqtt_test_app_id во вкладках Connections, Consumers, and Producers.

Для создания защищённого TLS-соединения нужно добавить параметры ssl-конфигурации, c использованием дополнительного билдера MqttClientSslConfigBuilder:

KeyManagerFactory keyManagerFactory = ...;      // настраиваем предварительно KeyManagerFactory
TrustManagerFactory trustManagerFactory = ...;  // настраиваем предварительно TrustManagerFactory
HostnameVerifier hostNameVerifier = ...;        // настраиваем предварительно HostnameVerifier

Mqtt5ClientBuilder сlientBuilder = Mqtt5Client.builder()
        .identifier("mqtt_test_app_id")
        .serverHost("0.0.0.0")
        .serverPort(61616);
        .sslConfig() // создаём дополнительный билдер MqttClientSslConfigBuilder
            .trustManagerFactory(trustManagerFactory)
            .hostnameVerifier(hostNameVerifier)
            .keyManagerFactory(keyManagerFactory)
            .applySslConfig();

Варианты API#

Реализация MQTT-клиента в библиотеке HiveMQ предлагает 3 различных способа работы клиента с брокером и, соответственно, 3 варианта, используемого при этом, API:

  • синхронный стиль (Blocking) - методы API напрямую возвращают результаты и блокируют текущий поток до тех пор, пока они не станут доступны;

  • асинхронный стиль (Asynchronous) - методы API возвращают; CompletableFuture вместо ожидания результата. Для потоков входящих сообщений (публикаций) используются callback-функции.

  • реактивный стиль (Reactive) - методы API используют RxJava (Reactive Streams и Reactive Extensions).

Выбор используемого стиля работы (варианта API) клиента может осуществляться:

  • напрямую при создании экземпляра клиента билдером - в этом случае, вместо вызова метода build(), следует использовать соответствующие методы: buildBlocking(), buildAsync() или buildRx(), например:

Mqtt5AsyncClient asyncClient = Mqtt5Client.builder().buildAsync();
  • путём переключения во время работы клиента - выполняется с помощью соответствующего метода: toBlocking(), toAsync(), toRx(),
    например:

Mqtt5Client client = ...; // может быть экземпляром Mqtt5BlockingClient, Mqtt5AsyncClient или Mqtt5RxClient
Mqtt5BlockingClient blockingClient = client.toBlocking();

Подключение клиента к брокеру#

После создания экземпляра MQTT-клиента с выбором требуемого режима работы, можно подключиться к брокеру методом connect():

blockingClient.connect();

Работа клиента с брокером#

Объект Mqtt5Client инкапсулирует внутри себя одновременно как работу в режиме Публикатора (продюсера сообщений на брокер Artemis) так и работу в режиме Подписчика (консьюмера сообщений с брокера Artemis)

Работа в режиме Публикатора (pub)#

Работа клиента в pub-режиме (продюсера сообщений на брокер Artemis) может производиться двумя способами:

  1. Формирование публикации «на лету» и его последующая передача методом publishWith().

  2. Передача заранее подготовленной публикации (объекта Mqtt5Publish) методом publish().

Пример:

// 1. Формирование сообщения "на лету" и его отправка на брокер
blockingClient.publishWith()
        .topic("test/topic")
        .qos(MqttQos.AT_LEAST_ONCE)
        .payload("Тело тестового сообщения".getBytes())
        .send();

// 2.1. Предварительное создание сообщения
Mqtt5Publish msgForPublish = Mqtt5Publish.builder()
        .topic("test/topic")
        .payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
        .qos(MqttQos.AT_LEAST_ONCE)
        .build();

// 2.2. Публикация сообщения
blockingClient.publish(msgForPublish); 
Работа в режиме Подписчика (sub)#

Для работы в режиме Подписчика (консьюмера сообщений с брокера Artemis) необходимо:

  1. Оформить подписку на топик одним из двух возможных способов:

    • 1.1. «на лету» - методом subscribeWith();

    • 1.2. с использованием заранее подготовленной подписки (объекта Mqtt5Subscribe) - методом subscribe().

  2. Вычитать сообщение из топика:

    • 2.1. в синхронном режиме - с помощью объекта Mqtt5Publishes (накопитель входящих публикаций):

      • 2.1.1. методом recieve() - с блокировкой на время чтения;

      • 2.1.2. методом recieve() - с тайм-аутом ожидания;

      • 2.1.3. методом recieveNow() - без блокировки на время чтения;

    • 2.2. в асинхронном режиме - с помощью callback-функции:

      • 2.2.1. в рамках оформленной подписки (методами subscribeWith(), subscribe())

      • 2.2.2. в глобальном слушателе сообщений (методом publishes())

Пример реализации синхронного Подписчика:

// 1.1. Подписка на сообщения топика "на лету"
blockingClient.subscribeWith()
    .topicFilter("test/topic")  // указываем топик
    .qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
    .send(); // отправляем команду на брокер

// 2.1. Создаём предварительно объект Mqtt5Publishes
// настраиваем фильтр входящих сообщений из подписки:
var publishes = blockingClient.publishes(MqttGlobalPublishFilter.SUBSCRIBED);
// Вычитывание сообщения из топика подписки:
// 2.1.1. с блокировкой на время чтения 
Mqtt5Publish publishMessage = publishes.receive();
// 2.1.2. с таймаутом ожидания 
Optional<Mqtt5Publish> publishMessage = publishes.receive(10, TimeUnit.SECONDS);
// 2.1.3. без блокировки на время чтения
Optional<Mqtt5Publish> publishMessage = publishes.receiveNow();

Пример реализации Подписчика на асинхронном API:

// Обработчик полученных с брокера сообщений
Consumer<Mqtt5Publish> consumedMsgCallback = publishMsg -> System.out.println(new String(publishMsg.getPayloadAsBytes()));

// 2.2.2. Глобальный слушатель всех полученных клиентом сообщений
// Метод `publishes()` должен быть вызван перед методом `subscribe()`, чтобы гарантировать, что сообщение не будет потеряно.
// Этот метод можно вызвать перед подключением (connect) для получения сообщений за предыдущий сеанс.
asyncClient.publishes(MqttGlobalPublishFilter.ALL, consumedMsgCallback);

// 1.1. Подписка "на лету"
asyncClient.subscribeWith()
    .topicFilter("test/topic")  // указываем топик
    .qos(MqttQos.AT_LEAST_ONCE) // указываем Quality of Service
    // 2.2.1. Обработка каждого полученного сообщения из топика подписки асинхронным обработчиком
    .callback(consumedMsgCallback)
    .send(); // отправляем команду на брокер

// 1.2. Использование предварительно созданной подписки
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
        .topicFilter("test/topic")
        .qos(MqttQos.EXACTLY_ONCE)
        .build();
// 2.2.1. Обработка каждого полученного сообщения из топика подписки асинхронным обработчиком
asyncClient.subscribe(subscribeMessage, consumedMsgCallback);

Отключение клиента от брокера#

После окончания работы с клиентом, производится отключение от брокера:

blockingClient.disconnect();

Mqtt5Publish#

MQTT-клиенты используют объекты класса Mqtt5Publish:

  1. В режиме публикации (pub) - для создания сообщения перед отправкой на брокер.

  2. В режиме подписки (sub) - так как сообщения поступают с брокера на клиент в виде экземпляров объектов данного класса.

Для создания экземпляра класса Mqtt5Publish перед отправкой может использоваться билдер Mqtt5PublishBuilder, например:

// Настраиваем предварительно фильтр входящих сообщений из подписки
var publishes = client.publishes(MqttGlobalPublishFilter.SUBSCRIBED);

// 1. Создание публикации (экземпляра класса `Mqtt5Publish`) перед отправкой в топик на брокере
var msgForPublish = Mqtt5Publish.builder()
        .topic("test/topic")
        .payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
        .qos(MqttQos.AT_LEAST_ONCE)
        .build();
// Отправка сообщения (экземпляра класса `Mqtt5Publish`) в топик на брокере (pub-режим)
client.publish(msgForPublish);

// 2. Получение публикации (экземпляра класса `Mqtt5Publish`) из топика подписки брокера (sub-режим)
Mqtt5Publish publishMessage = publishes.receive();

Сообщения публикуются с обязательным указанием топика.

Топик необходим брокеру для рассылки сообщений подписчикам и является единственным обязательным свойством объекта класса Mqtt5Publish.

Остальные свойства не являются обязательными. В общем случае публикация может содержать следующие поля:

Свойство

Назначение

Тип

getter

topic

имя целевого топика публикации

String

getTopic()

payload

полезная нагрузка (тело сообщения), представляется всегда в байтовом виде

byte[] / ByteBuffer

getPayloadAsBytes() / getPayload()

qos

уровень гарантии доставки

MqttQos

getQos()

retain

признак retain-сообщения

boolean

isRetain()

messageExpiryInterval

интервал истечения срока действия сообщения (в секундах)

OptionalLong

getMessageExpiryInterval()

payloadFormatIndicator

индикатор типа тела сообщения (UTF_8 - текстовое, UNSPECIFIED - бинарное)

Mqtt5PayloadFormatIndicator

getPayloadFormatIndicator()

contentType

тип содержимого тела сообщения (стандартный MIME-тип), игнорируется брокером Artemis

String / MqttUtf8String

contentType()

userProperties

заголовки сообщения (набор строковых пар «ключ-значение»)

Mqtt5UserProperties

getUserProperties()

Гарантия доставки сообщений#

Mqtt поддерживает все три соглашения о гарантии доставки (Quality of Service):

  • QoS 0 (MqttQos.AT_MOST_ONCE) - at most once (максимум одно сообщение) - возможны потери сообщений в процессе доставки,

  • QoS 1 (MqttQos.AT_LEAST_ONCE)- at least once (минимум одно сообщение) - возможны дубликаты сообщений в процессе доставки,

  • QoS 2 (MqttQos.EXACTLY_ONCE) - exactly once (ровно одно сообщение) - доставка сообщений без потерь и дубликатов.

Retain Messages#

  • специальные retain-сообщения могут сохраняться для каждого топика, чтобы быть переданными в первую очередь каждому новому подписавшемуся на топик клиенту-консьюмеру;

  • retain-сообщения хранятся пока клиент явно их не удалит или не истечёт (expires) время их жизни.

Пользовательские заголовки сообщения#

Пользовательские заголовки (User Properties) - это определенные пользователем пары имен и значений, которые отправляются вместе с сообщением о публикации Mqtt. Mqtt поддерживает строковый формат имен и значения в заголовках. Добавить к публикации заголовки можно через специальный билдер несколькими способами, например:

// 2. Создание публикации (сообщения) для отправки в топик на брокере
// 2.1. Формирование заголовков на основе готового набора свойств (системных свойств ОС)
List<Mqtt5UserProperty> msgHeaders = System.getProperties().entrySet().stream()
        .filter(entry -> entry.getKey().toString().startsWith("os"))
        .map(entry -> Mqtt5UserProperty.of(entry.getKey().toString(), entry.getValue().toString()))
        .collect(Collectors.toList());
// 2.2. Формирование заголовка
Mqtt5UserProperty header2 = Mqtt5UserProperty.of("header2", "value2");
// 2.3. Создание публикации с помощью билдера
var msgForPublish = Mqtt5Publish.builder()
        .topic("test/topic")
        .payload("Тело тестового сообщения".getBytes()) // добавляем тело сообщения как массив байт
        .qos(MqttQos.AT_LEAST_ONCE)
        .userProperties() // использование билдера заголовков
            .add("header1", "value1") // непосредственное добавление пары строк "ключ-значение"
            .add(header2)             // добавление заголовка как сущности `Mqtt5UserProperty`
            .addAll(msgHeaders)       // добавление коллекции заголовков
            .applyUserProperties()    
        .build();

Mqtt5Subscription#

MQTT-клиенты используют объекты класса Mqtt5Subscription для настройки параметров будущей подписки. При оформлении подписки через объект класса Mqtt5Subscribe, экземпляры класса Mqtt5Subscription могут добавляться к подписке специальным методом addSubscription(). Соответственно, список привязанных к объекту Mqtt5Subscribe подписок (экземпляров Mqtt5Subscription) можно получить методом getSubscriptions().

Например:

// Оформление подписки
Mqtt5Subscribe subscribeMessage = Mqtt5Subscribe.builder()
    .topicFilter("test/topic")
    .addSubscription()             // добавление дополнительной подписки с помощью билдера
        .topicFilter("test/global")     // параметр подписки
        .qos(MqttQos.EXACTLY_ONCE)      // параметр подписки
        .noLocal(true)                  // параметр подписки
        .applySubscription()            // применение подписки
    .build();

// Получение списка привязанных подписок из сообщения о подписке
var subscriptions = subscribeMessage.getSubscriptions();
System.out.println("Subscriptions: " + subscriptions);

Подписка формируется с обязательным указанием фильтра топика.

Фильтр топика позволяет брокеру рассылать подписчикам сообщения сразу из нескольких топиков

В фильтре можно заменять части имени топика wildcard-символами. В MQTT спецификации при указании имени группы топиков для подписки используются wildcard-символы:

  • / - разделитель частей составного имени;

  • # - шаблон для произвольной последовательности частей (слов) в имени;

  • + - шаблон для одной части (слова) в имени.

В зависимости от содержимого фильтра топика различают следующие виды MQTT-подписок:

  • обычная подписка - фильтр не содержит wildcard-символов и представляет полное имя топика;

  • wildcard-подписка - фильтр содержит wildcard-символы, что позволяет ему подписаться сразу на несколько топиков;

  • shared-подписка - задаётся в специальном формате: $share/<shareName>/<topicFilter>, с указанием имени группы (<shareName>) и фильтра топика (<topicFilter>).

В MQTT v.5 можно оформлять общую подписку (shared subscriptions) сразу для группы подписчиков (subscription group). Сообщения из топика будут распределяться по подписчикам из одной группы по очереди (round-robin алгоритмом). Общие подписки могут быть полезны для балансировки нагрузки и обеспечения отказоустойчивости.

В общем случае подписка может содержать следующие поля:

Свойство

Назначение

Тип

getter

topicFilter

фильтр имени топика публикации (могут использоваться wildcard-символы: #, +)

String или MqttTopicFilter

getTopicFilter()

qos

уровень гарантии доставки

MqttQos

getQos()

retainHandling

вид обработки retain-сообщений для этой подписки (SEND, DO_NOT_SEND, SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST)

Mqtt5RetainHandling

getRetainHandling()

retainAsPublished

должен ли retain-флаг для входящих сообщений о публикации быть установлен в исходное значение

boolean

isRetainAsPublished()

noLocal

флаг, показывающий, что сообщения из подписки будут получены только если отправлены другими клиентами, а не себе самому

boolean

isNoLocal()

Режимы коммитов#

Для обеспечения надежности доставки сообщений между клиентами и брокерами, протокол MQTT использует подтверждение успешной вычитки сообщений (Message Acknowledgement). Когда в топике публикуется сообщение с уровнем QoS 1 или QoS 2, отправитель ожидает подтверждения от получателя о том, что сообщение получено и успешно обработано. По умолчанию это автоматическая операция, которая выполняется в фоновом режиме. В качестве альтернативы, Java-клиент MQTT HiveMQ поддерживает подтверждение полученных сообщений в ручном режиме (Manual Message Acknowledgement). В этом случае получатель (subscriber) явно подтверждает сообщение после его обработки, предоставляя отправителю гарантии успешной вычитки и обработки публикаций.

Например:

asyncClient.subscribeWith()
    .topicFilter("test/topic")
    .manualAcknowledgement(true) // включить подтверждение полученных сообщений в ручном режиме для данной подписки
    .callback(publish -> { // обработчик сообщений подписки, поступающих с брокера
        boolean success = false;

        // Некая логика обработки входящего сообщения

        if (success) {
            publish.acknowledge();  // Обусловленное некоторой логикой обработки подтверждение получения сообщения
        }
    })
    .send()
    .join();

Mqtt5Connect#

Экземпляр Mqtt5Connect класса используется для настройки подключения клиента к брокеру. Можно подключить клиент MQTT без необходимости указывать аргументы. При этом будут использоваться параметры по умолчанию, определенные в спецификации MQTT, или разумные значения по умолчанию, если они там не определены.

Например:

// Подключение с параметрами по умолчанию
client.connect();

// Подключение с передачей пользовательских значений параметров
client.connectWith().keepAlive(10).send();

// Предварительное создание объекта Mqtt5Connect с помощью билдера
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(10).build();

// Подключение с передачей объекта Mqtt5Connect
client.connect(connectMessage);

Параметры клиентского подключения#

Допустимы следующие параметры подключения:

  • Clean Start - определяет, хочет ли клиент начать новый “чистый” сеанс (true) или хочет возобновить предыдущий сеанс, если таковой имеется (false);

  • Session Expiry Interval - интервал времени (в секундах), в течение которого сеанс будет продолжаться после отключения клиента;

  • Keep Alive - интервал времени (в секундах), в течение которого клиент отправляет ping-запрос брокеру, если в течение этого периода времени не было отправлено никаких других пакетов MQTT. Используется для определения того, работает ли соединение по-прежнему;

  • Will - это сообщение («завещание») публикуется брокером автоматически, если клиент отключился по неосторожности или с кодом причины DISCONNECT_WITH_WILL_MESSAGE;

  • Restrictions - можно указывать ограничения как на стороне брокера, так и на стороне клиента. Ограничения для сообщений, полученных от брокера, отправляются вместе с сообщением Mqtt 5 Connect. Другие параметры для сообщений, которые клиент отправляет сам, используются в сочетании с ограничениями, указанными брокером в сообщении Mqtt5ConnAck, для определения фактических ограничений на стороне клиента;

  • User Properties - набор пользовательских заголовков, передаваемых вместе с сообщением о подключении.

Параметры конфигурации MQTT-протокола на стороне брокера#

Параметр

Место

Значение по умолчанию

Описание

mqtt-session-scan-interval

broker.xml

5000 мс

определяет интервал сканирования сессий MQTT на сервере. Он указывает, как часто сервер будет проверять состояние сессий и удалять устаревшие сессии. Это может быть полезно для управления ресурсами сервера и предотвращения утечки памяти

mqtt-session-state-persistence-timeout

broker.xml

5000 мс

время ожидания брокером окончания записи на диск перед возбуждением исключения (throwing an error). настройка определяет время, в течение которого состояние сессии MQTT сохраняется на сервере. Если клиент отключается от сервера, то сервер сохраняет состояние сессии на заданное время, чтобы клиент мог восстановить сессию при повторном подключении. Если время истекает, то состояние сессии удаляется и клиент должен начать новую сессию. Эта настройка может быть полезна для обеспечения надежности передачи сообщений и сохранения состояния сессии при временных отключениях клиента

defaultMqttSessionExpiryInterval

MQTT-акцептор

5000 мс

по умолчанию может быть настроен на MQTT-акцепторе таким образом, что заброшенные сеансы и очереди подписки будут автоматически очищаться по истечении интервала истечения срока действия

serverKeepAlive

MQTT-акцептор

60 сек

определяет максимальное время, в течение которого клиент может быть отключен от сервера, прежде чем сервер закроет сессию. Это может быть полезно для обеспечения надежности передачи сообщений и предотвращения утечки ресурсов сервера; значение, равное 0, полностью отключает функцию keep alives, независимо от того, установлено ли для клиента значение keep alive. Делать подобное не рекомендуется, поскольку отключение функции keep alives обычно считается опасным, так как может привести к исчерпанию ресурсов; значение -1 означает, что брокер всегда будет принимать значение keep alive для клиента (даже если это значение равно 0); любое другое значение означает, что serverKeepAlive будет применено, если оно меньше значения keep alive для клиента (за исключением, когда клиентский keep alive равен 0 и будет приоритетно применено значение serverKeepAlive, чтобы предотвратить небезопасное отключение функции keep alives).

allowLinkStealing

MQTT-акцептор

true

когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, сеанс существующего клиента будет закрыт. По умолчанию allowLinkStealing имеет значение true. Если для этого параметра установлено значение false, то всякий раз, когда новый клиент подключается с тем же идентификатором клиента, что и у другого существующего клиента, сеанс нового клиента будет закрыт

receiveMaximum

MQTT-акцептор

65 535

настройка flow control. Брокер может сообщить клиенту, сколько сообщений QoS 1 и QoS 2 он может получить до подтверждения, и наоборот. значение, равное 0, запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о максимальном количестве получаемых данных, что означает, что управление потоком данных от клиентов к брокеру будет отключено. По сути, это то же самое, что установить значение 65 535, но уменьшает размер пакета CONNACK на несколько байт.

topicAliasMaximum

MQTT-акцептор

65 535

в MQTT 5 введено использование механизма числовых псевдонимов топиков (topic aliasing). Служит для оптимизация размера пакетов управления публикацией, поскольку теперь вместо строкового имени топика, которое потенциально может быть довольно длинным, можно использовать 2-байтовое целое значение; обе стороны, и клиент, и брокер могут сообщать друг другу о максимальном значении псевдонима, которое они поддерживают (т.е. сколько разных псевдонимов-чисел может быть использовано); значение по умолчанию равно 65 535 (максимальное значение 2-байтового целого числа, используемого MQTT); значение, равное 0, отключит передачу псевдонимов топиков от клиентов брокеру; значение, равное -1, не позволит брокеру информировать клиента о максимальном количестве псевдонимов топиков, что означает, что передача псевдонимов топиков от клиентов брокеру будет отключена. Это фактически то же самое, что установить значение равным 0, но уменьшает размер пакета CONNACK на несколько байт.

maximumPacketSize

MQTT-акцептор

268 435 455

в MQTT 5 введен максимальный размер пакета. Это максимальный размер пакета, который сервер или клиент готов принять; значение 0 запрещено спецификацией MQTT 5; значение -1 не позволит брокеру информировать клиента о каком-либо максимальном размере пакета, что означает, что размер входящих пакетов не будет ограничен, что также уменьшает размер пакета CONNACK на несколько байт.

closeMqttConnectionOnPublishAuthorizationFailure

MQTT-акцептор

true

Спецификация MQTT 3.1.1 неоднозначна в отношении поведения брокера при сбое публикации пакета из-за отсутствия авторизации (можно или закрыть соединение или дать положительное подтверждение); по дефолту брокер будет закрывать соединение; чтобы изменить это поведение на выдачу брокером положительного подтверждения нужно использовать URL-параметр конфигурации

Mqtt5Disconnect#

Экземпляр Mqtt5Disconnect класса используется для отключения клиента от брокера.

Можно отключить клиент MQTT без необходимости указывать дополнительные аргументы. При этом будут использоваться параметры по умолчанию, определенные в спецификации MQTT, или разумные значения по умолчанию, если они там не определены.

Примеры:

// Отключение с параметрами по умолчанию
client.disconnect();

// Отключение с передачей пользовательских значений параметров
client.disconnectWith().reasonCode(Mqtt5DisconnectReasonCode.DISCONNECT_WITH_WILL_MESSAGE).send();

// Предварительное создание объекта Mqtt5Disconnect с помощью билдера
Mqtt5Disconnect disconnectMessage = Mqtt5Disconnect.builder().sessionExpiryInterval(100).build();

// Отключение с передачей объекта Mqtt5Disconnect
client.disconnect(disconnectMessage);

Параметры клиентского отключения#

Допустимы следующие параметры отключения:

  • Reason code - код возможной причины отключения (Mqtt5DisconnectReasonCode);

  • Session Expiry Interval - интервал времени (в секундах), в течение которого сеанс будет продолжаться после отключения клиента;

  • User Properties - набор пользовательских заголовков, передаваемых вместе с сообщением об отключении.

Примеры логирования#

Штатные логи классов клиента MQTT-протокола не формируются. Необходимо полагаться на логирование в клиентском приложении. Например, INFO-логи примера mqtt_1 выглядят следующим образом:

[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO  - Блокирующий MQTT-клиент с отправкой и вычиткой сообщения (mqtt_1)
[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO  - Отправлено сообщение: тело='Тело тестового сообщения', заголовки=[(header1, value1), (header2, value2), (os.name, Mac OS X), (os.version, 14.6), (os.arch, x86_64)]
[main] ru.sbt.ss.artemis.clients.mqtt.ExMqtt1Run INFO  - Получено сообщение: тело='Тело тестового сообщения', заголовки=[(header1, value1), (header2, value2), (os.name, Mac OS X), (os.version, 14.6), (os.arch, x86_64)]