JMS-клиент#

Протокол Core (JMS-клиент)#

Core-протокол (Artemis CORE protocol) - это простой интуитивно понятный Java API,
который используется внутренним ядром брокера Artemis, а также доступен на стороне клиентских приложений.
Позволяет управлять большинством объектов брокера, реализует полный набор функций обмена сообщениями.
Возможна работа по данному протоколу посредством клиентского JMS API.

Предусловия использования#

Перед настройкой JMS-клиента должны быть выполнены следующие пункты:

Подключение зависимостей#

 <dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>artemis-jms-client</artifactId>
	<version>${artemis.version}</version>
 </dependency>  

Описание интерфейсов основных сущностей клиентского JMS API#

Основные интерфейсы клиентского JMS API#

Интерфейс

Краткое описание

ConnectionFactory

инкапсулирует набор параметров конфигурации подключений

Connection

представляет активное клиентское подключение к брокеру

Queue

инкапсулирует особенности представления имени очереди сообщений

Session

однопоточный родительский объект, необходимый для отправки и приема сообщений

MessageConsumer

получает сообщения из очередей брокера

MessageProducer

используется для отправки сообщений в очередь на брокере

Message

представляет родительский интерфейс сообщений, отправленных или полученных через брокер

Интерфейсы слушателей и обработчиков клиентского JMS API#

Интерфейс

Краткое описание

CompletionListener

используется для асинхронной передачи сообщений продюсером (MessageProducer)

ExceptionListener

уведомляет клиент (Connection), когда при подключении произошел сбой

MessageListener

используется для асинхронного получения сообщений

ConnectionFactory#

JMS Core-клиенты используют объекты ConnectionFactory для создания объектов Connection.
ConnectionFactory используются для конфигурирования необходимых параметров подключения к брокеру. Объекты ConnectionFactory могут создаваться с использованием фабрики ActiveMQConnectionFactory, например:

// Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения  
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616?tcpReceiveBufferSize=1048576;tcpSendBufferSize=1048576");  

Connection#

JMS Core-клиенты используют объекты Connection для создания активного клиентского подключения к серверу.
Объекты Connection создаются с использованием класса ConnectionFactory, например:

// Создаем JMS-соединение с брокером  
Connection connection = cf.createConnection();  

После инициализации объекта подключения (перед первым его использованием) можно указать уникальный идентификатор клиента (clientId), который поможет в дальнейшем во время отладки и мониторинга клиентского приложения, например:

connection.setClientID("jms-core-client-app-id");  

Работу клиентского приложения теперь можно отслеживать в консоли управления брокера по значению Client ID = jms-core-client-app-id во вкладках Connections, Consumers, and Producers.

ExceptionListener#

Объекты подключения могут дополнительно регистрировать опциональный обработчик (объект класса, реализующего интерфейс ExceptionListener), через который клиентский код будет получать асинхронные уведомления о фатальных ошибках, возникающих при подключении к брокеру, например:

// Регистрируем обработчик ошибок подключения  
connection.setExceptionListener(e -> System.out.println("Ошибка соединения с брокером: " + e.getMessage()));  

Queue#

Объект Queue инкапсулирует имя очереди, зависящее от поставщика, в нашем случае, имя адреса или очереди ActiveMQ Artemis.
Для тех методов, которые используют очередь или адрес в качестве параметра, объект Queue используется в качестве аргумента.
Например, очередь нужно использовать при создании MessageConsumer и MessageProducer:

// Создаем объект JMS-очередь для очереди `test-address::queue-1` на брокере  
Queue queue = ActiveMQJMSClient.createQueue("test-address::queue-1");  

Session#

JMS Core-клиенты используют объекты Session (сессии) для получения и отправления сообщений, для группировки их в транзакциях. Объекты Session могут поддерживать как транзакционную, так и нетранзакционную семантику.
Сессии группируют внутри себя объекты MessageConsumer (консьюмер) и MessageProducer (продюсер), например:

// Создаем JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру  
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Поддержка автоматических подтверждений получения сообщений  
  
// Создаем поставщик (продюсер) сообщений в адрес на брокере  
MessageProducer producer = session.createProducer(address);  
  
// Создаем потребителель (консьюмер) сообщений из очереди на брокере  
MessageConsumer messageConsumer = session.createConsumer(queue);  
  
// Создаем текстовое сообщение для отправки в очередь  
TextMessage message = session.createTextMessage("This is a text message");  

MessageConsumer#

Jms-клиенты используют объекты MessageConsumer (консьюмеры) для вычитки (потребления) сообщений из очереди брокера.
После вычитки сообщения консьюмером происходит его обработка и, когда этот процесс завершается, для каждого сообщения на сервер отправляется подтверждение (ack) его успешного получения.
Как только сообщение подтверждено, оно исчезает из очереди и становится недоступным для повторной доставки.
Если система выйдет из строя до того, как сервер обмена сообщениями получит подтверждение от пользователя, то после восстановления сообщение будет доступно для повторной доставки (redelivery) пользователю.
Jms API поддерживает как синхронную, так и асинхронную семантику потребления сообщений.
Потребление в синхронном режиме осуществляется с использованием метода receive(), вызов которого приводит к блокировке клиентского приложения до момента получения сообщения (или истечении тайм-аута ожидания), например:

// Создаем потребителель (консьюмер) сообщений из очереди на брокере  
MessageConsumer messageConsumer = session.createConsumer(queue);  
  
// Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера  
connection.start();  
  
// Вычитываем сообщение из брокера с помощью консьюмера  
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);  

Потребление в асинхронном режиме производится при использовании транзакций или путем задания обработчика MessageListener.

MessageListener#

Обработчик асинхронного получения сообщений с брокера можно зарегистрировать для объекта консьюмера следующим образом:

// Регистрируем обработчик асинхронного получения сообщений с брокера  
consumer.setMessageListener(new MessageListener() {  
	@Override
	public void onMessage(Message message) {
		try {
			var messageReceived = (TextMessage) message;
			System.out.println("Консьюмер получил с брокера сообщение: " + messageReceived.getText());
			messageReceived.acknowledge(); // Подтверждаем факт успешной вычитки сообщения
		} catch (JMSException e) {
			throw new RuntimeException(e); 
		} 
	}
});  

Cм. пример 2.

MessageProducer#

Jms-клиенты используют объекты MessageProducer (продюсеры) для отправки сообщений в определенную очередь брокера.
При создании продюсера необходимо сразу задать очередь назначения, на которую будут маршрутизироваться все отправляемые методом send() сообщения, например:

// Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере  
Queue queue = ActiveMQJMSClient.createQueue("test-address::queue-1");  
  
// Создаем продюсер сообщений в очередь  
MessageProducer producer = session.createProducer(queue);  
  
// Отправляем сообщение с помощью продюсера на заранее привязанную к продюсеру очередь `test-address::queue-1`  
producer.send(message);  

Специальные URL-параметры объекта фабрики соединений, позволяют управлять темпом отправки сообщений:

  • producerMaxRate - устанавливает ограничение на максимальную скорость производства сообщений (число сообщений в секунду), -1 - означает снятие всех ограничений и используется по умолчанию;

  • producerWindowSize - устанавливает размер окна отправки (буфера сообщений в байтах) для сокращения числа запросов к серверу при работе с большим количеством сообщений.

Режимы отправки#

Существует два разных режима отправки сообщений продюсером: синхронный и асинхронный.

Синхронный режим#

Синхронный режим отправки означает, что продюсер будет блокироваться до тех пор, пока сообщение не будет получено брокером и сервер не уведомит клиента об этом.
Включать использование блокирующей семантики отправки можно с помощью следующих URL-параметров объекта фабрики соединений:

  • blockOnDurableSend - включение (true)/выключение (false) блокировки отдельно для персистентных (durable) сообщений;

  • blockOnNonDurableSend - включение (true)/выключение (false) блокировки отдельно для неперсистентных сообщений (true/false).

Необходимо помнить, что производительность отправки сообщений в синхронном режиме будет существенно ограничена сетевой задержкой по транспортировке сообщения и получению обратного подтверждения от сервера Round Trip Time (RTT) и не будет зависеть от пропускной способности сети.
Для того чтобы избежать подобных издержек, необходимо воспользоваться возможностью асинхронного режима отправки, или использовать транзакции с отправкой в пакетном режиме.

Асинхронный режим#

Асинхронный режим отправки означает, что продюсер не будет блокироваться и может продолжать работу, пока сообщение будет доставляться на сервер.
При этом уведомления (acknowledgement) о получении сообщения на брокере будут в ответ пересылаться в отдельном потоке через callback-обработчик CompletionListener клиентского приложения.
Асинхронный режим может быть полезен при работе с большим количеством сообщений, для уменьшения количества запросов к серверу и увеличения производительности. Перед использованием асинхронного режима отправки необходимо предварительно установить положительное значение параметра подключения confirmationWindowSize.
Сделать это можно передачей query-параметра confirmationWindowSize=1 в URI при создании фабрики соединений, которая будет использоваться, в свою очередь, для создания объекта продюсера:

// Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения  
ConnectionFactory cf = new ActiveMQConnectionFactory(URL + ";confirmationWindowSize=1");  
connection = cf.createConnection();  
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);  
TextMessage message = session.createTextMessage("This is a text message"); // Создаем текстовое сообщение для отправки в очередь  

После чего нужно создать обработчик уведомлений CompletionListener.

CompletionListener#

Асинхронный обработчик уведомлений о получении сообщений на брокере можно создать следующим образом:

// Создаем асинхронный обработчик уведомлений о получении сообщений на брокере  
CompletionListener completionListener= new CompletionListener() {  
	@Override
	public void onCompletion(Message message) {
		try {
			logger.info("Для сессии [ClientID={}] получено асинхронное уведомление об успешном приеме на брокере сообщения: ID={}", clientId, message.getJMSMessageID());
		} catch (JMSException e) {
			throw new RuntimeException(e);
		}
	}  
	@Override
	public void onException(Message message, Exception e) {
		try {
			logger.info("Для сессии [ClientID={}] получено асинхронное уведомление о неудачном приеме на брокере сообщения: ID={}", clientId, message.getJMSMessageID());
		} catch (JMSException ex) {
			throw new RuntimeException(ex);
		}
	}
};  

Теперь, для передачи сообщений на брокер в асинхронном режиме, достаточно указать обработчик уведомлений вторым параметром в методе send на объекте-продюсере:

producer.send(message, completionListener);  

Можно также передать кастомный обработчик вместе с отправляемым сообщением непосредственно в методе send() продюсера и выполнить отправку в асинхронном режиме, например:

// Отправляем сообщение с помощью продюсера с кастомным обработчиком в асинхронном режиме  
producer.send(message, new CompletionListener() {  
	@Override
	public void onCompletion(Message message) {
		logger.info("Получено уведомление об успешном приеме на брокере сообщения: {}", message);
	}
	@Override
	public void onException(Message message, Exception e) {
		logger.info("Получено уведомление о неудачном приеме на брокере сообщения: {}, {}", message, e.getMessage());
	}
 });  

Cм. пример 2.

Message#

JMS-клиенты могут работать с разными типами сообщений. Любое сообщение состоит из тела и служебных заголовков (набора пар ключ-значение). Тело сообщения содержит отправляемые приложением данные.
Message - это обобщенный JMS-интерфейс для разного типа сообщений.

По способу представления тела сообщения, различают:

  • строковые сообщения TextMessage (ActiveMQTextMessage) содержат текст в кодировке UTF-8 в виде объекта java.lang.String, например:

// Создаем текстовое UTF-8 сообщение для отправки в очередь  
TextMessage messageStrUtf8 = session.createTextMessage("Тело текстового сообщения в кодировке UTF-8");  
  • бинарные сообщения BytesMessage (ActiveMQBytesMessage) содержат произвольный массив байтов, в том числе и текст в кодировке, отличной от UTF-8, например:

// Сохраняем текст в кодировке Windows-1251 как массив байт  
byte[] win1251Body = ("Тело бинарного сообщения - текст в кодировке Windows-1251").getBytes("Windows-1251");  
// Создаем бинарное сообщение  
BytesMessage messageStrWin1251 = session.createBytesMessage();  
// Заполняем тело бинарного массивом байт  
messageStrWin1251.writeBytes(win1251Body);  
  • потоковое сообщение StreamMessage (ActiveMQStreamMessage) представляет собой последовательность примитивных типов Java, при этом отслеживается порядок и типы этих примитивов в потоке, применяются формальные правила их преобразования;

  • словарь-сообщение MapMessage (ActiveMQMapMessage) сохраняет тело сообщения в виде набора пар ключ-значение, где ключ - это строка java.lang.String, а значение может быть представлено примитивным типом Java, элементы словаря доступны по ключу или последовательно друг за другом;

  • объект-сообщение ObjectMessage (ActiveMQObjectMessage) содержит сериализуемый Java-объект в качестве тела сообщения, что полезно для обмена Java-объектами.

Cм. пример 3.

По размеру сообщения делятся на:

  • обычные (regular), если их размер меньше, чем указано в настройке клиентского транспорта minLargeMessageSize (по умолчанию 100KiB);

  • большие (large), если их размер больше или равен значению minLargeMessageSize.

Разница между обычными и большими сообщениями заключается в том, что большие сообщения в процессе передачи могут быть разделены на несколько частей (с целью оптимизации).
На сервере большие сообщения сохраняются в отдельных файлах и максимальный размер больших сообщений фактически не лимитирован.

Отдельно для больших сообщений можно включать/выключать сжатие на клиенте в настройке compressLargeMessage.
При этом перед отправкой на сервер сообщения, его тело будет архивироваться по алгоритму сжатия ZIP.
Степень сжатия задается отдельно в настройке compressionLevel в пределах от 0 до 9, по умолчанию используется значение -1 (это соответствует уровню 6-7).
Чем больше уровень сжатия, тем меньше трафик сообщений, но больше время на их сжатие и декомпрессию на клиенте.
Если после сжатия размер сообщения станет меньше, чем minLargeMessageSize, то оно будет отправлено как обычное (regular) сообщение. Работа с большими сообщения на клиенте не отличается от работы с обычными сообщениями.

Cм. пример 4.

Особенности настройки клиентского подключения#

Рассмотрим подробно возможности конфигурирования параметров клиентского подключения к брокеру Артемиса.
Передача параметров клиентского подключения к брокеру может быть выполнена разными способами.

Настройка с помощью url#

Необходимые параметры подключения могут быть перечислены в момент создания фабрики соединений в формате query-параметров URL-строки вида tcp://host::port?TranspotConfiguration, например:

String url = "tcp://0.0.0.0:61616?protocols=CORE;supportAdvisory=false;suppressInternalManagementObjects=false;tcpReceiveBufferSize=1048576;tcpSendBufferSize=1048576;useEpoll=true;sslEnabled=false;verifyHost=false;needClientAuth=true";  
// Создаем экземпляр фабрики соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения  
ConnectionFactory cf = new ActiveMQConnectionFactory(url);  

Описание параметров конфигурации#

Рассмотрим подробнее некоторые параметры конфигурации в разрезе по настраиваемым сущностям:

Имя параметра конфигурирования

Тип значения

Краткое описание

— MessageProducer —

autoGroup

boolean

Устанавливает, будут ли продюсеры автоматически назначать идентификатор группы отправленным сообщениям

blockOnDurableSend

boolean

Устанавливает, будут ли продюсеры блокироваться при отправке персистентных сообщений или будут работать в асинхронном режиме

blockOnNonDurableSend

boolean

Устанавливает, будут ли продюсеры блокироваться при отправке неперсистентных сообщений или будут работать в асинхронном режиме

groupID

String

Задает идентификатор группы, который будет установлен на каждом сообщении

outgoingInterceptorList

String

Задает список исходящих перехватчиков (по именам классов их реализации) , который будут выполнены до того, как пакеты будут отправлены на сервер

producerMaxRate

int

Устанавливает ограничение на максимальную скорость производства сообщений (число сообщений в секунду), -1 - означает снятие всех ограничений и используется по умолчанию

producerWindowSize

int

Устанавливает размер окна отправки (буфера сообщений в байтах) для сокращения числа запросов к серверу при работе с большим количеством сообщений

compressionLevel

int

Устанавливает уровень сжатия сообщений в пределах от 0 до 9, по умолчанию используется значение -1 (это соответствует уровню 6-7). Чем больше уровень сжатия, тем меньше трафик сообщений, но больше время на их сжатие и декомпрессию

compressLargeMessages

boolean

Устанавливает, будет ли использоваться сжатие для больших сообщений

ackBatchSize

int

Устанавливает размер батча с подтверждениями (в байтах), отправляемыми в пакетном режиме (за один раз) для сокращения числа запросов к серверу

blockOnAcknowledge

boolean

Устанавливает, будут ли консьюмеры блокироваться при отправке подтверждений или будут работать в асинхронном режиме

cached

boolean

Устанавливает, будут ли большие сообщения, принятые консьюмерами, закешированы во временных файлах или нет

consumerMaxRate

int

Устанавливает ограничение на максимальную скорость потребления сообщений (число сообщений в секунду), -1 - означает снятие всех ограничений и используется по умолчанию

consumerWindowSize

int

Устанавливает размер окна получения (буфера предзагрузки сообщений в байтах) для сокращения числа запросов к серверу при работе с большим количеством сообщений

confirmationWindowSize

int

Устанавливает размер буфера окна подтверждения в байтах

incomingInterceptorList

String

Задает список входящих перехватчиков (по именам классов их реализации), которые будет выполнены после того, как будут получены пакеты с сервера

preAcknowledge

boolean

Устанавливает флаг предварительного подтверждения, который указывает, должны ли сообщения автоматически подтверждаться при получении или следует это делать явно при успешной обработке. Полезно при работе с большим количеством сообщений для увеличения производительности

— ConnectionFactory —

callTimeout

long

Устанавливает тайм-аут для операции вызова. Если операция не выполнится в течение этого времени, будет выброшено исключение CallTimeoutException

clientFailureCheckPeriod

long

Задает период (в мс) периодической проверки (ping) состояния клиента на наличие ошибок или сбоев

loadBalancingPolicyClassName

String

Устанавливает имя класса политики балансировки сообщений между соединениями

connectionTTL

long

Устанавливает времени жизни неактивного соединения

callFailoverTimeout

long

Устанавливает тайм-аут для операции вызова при переключении на резервное соединение. Если операция не выполнится в течение этого времени, будет выброшено исключение CallTimeoutException

failoverAttempts

int

Устанавливает количество попыток переключения на резервное соединение, которое будет использоваться при возникновении сбоя связи или переключении на резервное соединение

initialMessagePacketSize

int

Устанавливает начальный размер пакета сообщений, который будет использоваться для отправки

minLargeMessageSize

int

Устанавливает минимальный размер большого сообщения, который будет использоваться для определения, является ли сообщение большим или нет

passwordCodec

String

Устанавливает класс, который будет использоваться для кодирования и декодирования паролей

reconnectAttempts

int

Устанавливает максимальное количество попыток переподключения, перед ошибкой

retryInterval

long

Устанавливает интервал повторения, который будет использоваться при повторных операциях

retryIntervalMultiplier

double

Устанавливает множитель интервала повторения

maxRetryInterval

long

Устанавливает максимальный интервал повторения, который будет использоваться при повторных операциях

scheduledThreadPoolMaxSize

int

Устанавливает максимальный размер пула потоков планировщика

threadPoolMaxSize

int

Устанавливает максимальный размер пула потоков

useGlobalPools

boolean

Устанавливает флаг использования глобальных пулов потоков

Примечания:

  • в поставку дистрибутива SMBX входит реализация интерцепторов подписи сообщений;

  • при указании значения размера Window-буфера (consumerWindowSize, confirmationWindowSize) предполагается, что:

    • -1 позволяет задать неограниченный по размеру буфер для любого количества сообщений (следует с осторожностью использовать только для «быстрых» консьюмеров воизбежании переполнения буфера памяти);

    • 0 означает, что буфер не используется и сообщения принимаются с сервера/отправляются на сервер немедленно;

    • > 0 означает, что буфер имеет заданный максимальный размер в байтах и сообщения прекращают отправляться, когда размер буфера достигает этого значения.

Режимы коммитов#

Для обеспечения гарантии доставки сообщений, брокер требует от клиента подтверждения факта успешности вычитки каждого сообщения.
Выполняется это на клиенте путем формирования на первом этапе оповещений (ack) для успешно вычитанных сообщений на объекте-консьюмере (MessageConsumer),
с последующей их фиксацией на втором этапе в рамках текущей сессии (Session).
Как только подтверждение будет получено сервером, сообщение исчезнет из очереди и станет недоступно для повторной доставки.
Если же на клиенте произойдет сбой до того, как сервер получит подтверждение, то после восстановления работоспособности клиента сообщение будет доступно для повторной доставки (redelivery).

Данный процесс сделан поэтапным для возможности гибкого управления минимизацией издержек на доставку коммитов при больших нагрузках.
Логика процесса подтверждения вычитки определяется выбором требуемого режима коммитов при создании объекта-сессии, путем передачи соответствующего URL-параметраacknowledgeMode.

Возможны следующие режимы коммитов:

  • Session.AUTO_ACKNOWLEDGE - подтверждение вычитки на клиенте выполняется автоматически, по факту успешного завершения вызова метода консьюмера receive() или вызова слушателя сообщений MessageListener;

  • Session.CLIENT_ACKNOWLEDGE - подтверждение вычитки на клиенте выполняется явно вызовом метода acknowledge() объекта класса Message, при этом можно как индивидуально подтверждать каждое сообщение, так и выбрать подтверждение для группы сообщений (что делается путем вызова acknowledge() для последнего вычитанного сообщения группы, тем самым подтверждая все сообщения группы, полученные ранее в текущей сессии);

  • Session.DUPS_OK_ACKNOWLEDGE - подтверждение вычитки на клиенте выполняется «ленивым» образом, в момент закрытия сессии, одновременно для всех вычитанных сообщений в пакетном режиме. В результате возможно появление дубликатов сообщений, подтверждение которых не достигло брокера из-за проблем при транспортировке и повторно переданных в рамках redelivery;

  • ActiveMQJMSConstants.PRE_ACKNOWLEDGE - специальный режим Artemis: вычитка выполняется без гарантии доставки сообщений (ack производится на сервере, перед доставкой сообщения на клиент);

  • ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE - специальный режим Artemis: индивидуальное подтверждение наследует JMS-семантику CLIENT_ACKNOWLEDGE, за исключением того; что каждое сообщение должно подтверждаться индивидуально вызовом метода acknowledge() объекта класса Message.

С целью минимизации издержек на доставку уведомлений, можно использовать пакетное формирование подтверждений (ack) с помощью настройки ackBatchSize в клиентском коде. В этом случае для CLIENT_ACKNOWLEDGE режима коммитов будет осуществляться подтверждение окном, когда по мере успешной вычитки сообщений, подтверждения (ack) формируются не отдельно для каждого сообщения, а для набора сообщений, совокупный размер которых укладывается в окно (ackBatchSize). Размер окна определяется параметром ackBatchSize для текущей сессии (см. параметр ackBatchSize).

Примеры логирования#

Штатные логи классов клиентского JMS Core API выводятся в режиме WARN и ERROR.
Например, INFO-логи примера 1 выглядят следующим образом:

[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - JMS-клиент с отправкой и вычиткой текстового сообщения (jms_1)  
[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Продюсер отправил на брокер сообщение: This is a text message  
[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Консьюмер получил с брокера сообщение: This is a text message  

Для того чтобы получить детальную информацию в логах, требуется перевести логирование в режим DEBUG или TRACE.
Вот как уже выглядит запуск примера 1 с переведенным в режим DEBUG логированием классов из пакета org.apache.activemq.artemis.core.client:

[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - JMS-клиент с отправкой и вычиткой текстового сообщения (jms_1)  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying reconnection attempt 0/1  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying to connect with connectorFactory=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@3514a4c0 and currentConnectorConfig: TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
[Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet  
[Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Connected with the currentConnectorConfig=TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Reconnection successful  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - ClientSessionFactoryImpl received backup update for primary/backup pair = TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks / null  but it didn't belong to TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
[main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Requesting 32768 credits on address test-address::queue-1, needed = 32768, arriving = 0  
[main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Request 32768 credits on address test-address::queue-1  
[main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - not asking for -1 credits on test-address::queue-1  
[main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - AfterAcquire 260 credits on address test-address::queue-1, pendingCredits=32508  
[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Продюсер отправил на брокер сообщение: This is a text message  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - client ack messageID = 6442459081  
[main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Консьюмер получил с брокера сообщение: This is a text message  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=c7be3e67-a8b7-11ef-a53b-acde48001122, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@1e461e41  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=c7be3e67-a8b7-11ef-a53b-acde48001122, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@1e461e41  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=c7cdf5d8-a8b7-11ef-a53b-acde48001122, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=()]@66c92293  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=c7cdf5d8-a8b7-11ef-a53b-acde48001122, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=()]@66c92293  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Session was already closed, giving up now, this=ClientSessionImpl [name=c7cdf5d8-a8b7-11ef-a53b-acde48001122, username=null, closed=true, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=()]@66c92293  
[main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Session was already closed, giving up now, this=ClientSessionImpl [name=c7be3e67-a8b7-11ef-a53b-acde48001122, username=null, closed=true, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@29d89d5d, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@1e461e41  

Ниже приведены логи запуска примера 1 с переведенным в режим TRACE логированием классов из пакета org.apache.activemq.artemis.core.client:

2024-11-22 12:56:56.830 [main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - JMS-клиент с отправкой и вычиткой текстового сообщения (jms_1)  
2024-11-22 12:56:57.081 [main] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@7f3b84b8 CREATE  
2024-11-22 12:56:57.177 [main] org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl TRACE - Selecting connector from initial connectors.  
2024-11-22 12:56:57.189 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - getConnectionWithRetry::1 with retryInterval = 2000 multiplier = 1.0  
2024-11-22 12:56:57.190 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying reconnection attempt 0/1  
2024-11-22 12:56:57.191 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying to connect with connectorFactory=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@101952da and currentConnectorConfig: TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
2024-11-22 12:56:58.659 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet  
2024-11-22 12:56:58.790 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet  
2024-11-22 12:56:58.790 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Connected with the currentConnectorConfig=TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
2024-11-22 12:56:58.806 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet  
2024-11-22 12:56:58.816 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - returning RemotingConnectionImpl [ID=3f4f1f56, clientID=null, nodeID=null, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection@c7ba306[ID=3f4f1f56, local= /10.98.36.238:52931, remote=10.25.68.8/10.25.68.8:41777]]  
2024-11-22 12:56:58.831 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452::Subscribing Topology  
2024-11-22 12:56:58.832 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Reconnection successful  
2024-11-22 12:56:58.833 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - ClientSessionFactoryImpl received backup update for primary/backup pair = TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks / null  but it didn't belong to TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks  
2024-11-22 12:56:58.850 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl TRACE - NodeUp ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?trustStorePassword=****&port=41777&keyStorePassword=****&sslEnabled=true&host=10-25-68-8&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-jks&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/client/artemis-dev-keystore-jks], discoveryGroupConfiguration=null]::nodeID=ff162561-6ab9-11ef-8806-0050560a71e6, connectorPair=Pair[a=TransportConfiguration(name=artemis, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&port=41777&sslEnabled=true&host=10-25-68-8&verifyHost=false&activemq-passwordcodec=****&activemq-usemaskedpassword=****, b=null]  
2024-11-22 12:56:58.852 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@7f3b84b8::NewMemberAdd nodeId=ff162561-6ab9-11ef-8806-0050560a71e6 member = TopologyMember[id=ff162561-6ab9-11ef-8806-0050560a71e6, connector=Pair[a=TransportConfiguration(name=artemis, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&port=41777&sslEnabled=true&host=10-25-68-8&verifyHost=false&activemq-passwordcodec=****&activemq-usemaskedpassword=****, b=null], backupGroupName=null, scaleDownGroupName=null]  
2024-11-22 12:56:58.852 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@7f3b84b8::prepare to send ff162561-6ab9-11ef-8806-0050560a71e6 to 0 elements  
2024-11-22 12:56:59.079 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits 32768 on address test-address::queue-1, needed=32768, credits=32768, window=32768  
2024-11-22 12:56:59.079 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits on Address test-address::queue-1, requesting=32768, arriving=0, balance=0  
2024-11-22 12:56:59.079 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Requesting 32768 credits on address test-address::queue-1, needed = 32768, arriving = 0  
2024-11-22 12:56:59.079 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Request 32768 credits on address test-address::queue-1  
2024-11-22 12:56:59.149 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}:: being created at  
2024-11-22 12:56:59.182 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl TRACE - setAddress() Setting default address as test-address::queue-1  
2024-11-22 12:56:59.182 [main] org.apache.activemq.artemis.core.client.impl.ClientProducerImpl TRACE - sendRegularMessage::ClientMessageImpl[messageID=0, durable=true, address=test-address::queue-1,userID=1d5c461a-a8b8-11ef-b6cb-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_ROUTING_TYPE=1]], Blocking=true  
2024-11-22 12:56:59.198 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits 260 on address test-address::queue-1, needed=32768, credits=260, window=32768  
2024-11-22 12:56:59.198 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits did not need it, balance=32768, arriving=0,  needed=32768, getbalance + arriving < needed=false  
2024-11-22 12:56:59.198 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - not asking for -1 credits on test-address::queue-1  
2024-11-22 12:56:59.198 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - AfterAcquire 260 credits on address test-address::queue-1, pendingCredits=32508  
2024-11-22 12:56:59.223 [main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Продюсер отправил на брокер сообщение: This is a text message  
2024-11-22 12:56:59.224 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}:: receive(5000)  
2024-11-22 12:56:59.224 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}::receive(5000, false)  
2024-11-22 12:56:59.242 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}::Returning ClientMessageImpl[messageID=6442459095, durable=true, address=test-address::queue-1,userID=1d5c461a-a8b8-11ef-b6cb-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_INGRESS_TIMESTAMP=1732269419506, _AMQ_ROUTING_TYPE=1, _AMQ_VALIDATED_USER=user_1, TIMESTAMP_FROM_SERVER=1732269419533]]  
2024-11-22 12:56:59.243 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}:: returning ClientMessageImpl[messageID=6442459095, durable=true, address=test-address::queue-1,userID=1d5c461a-a8b8-11ef-b6cb-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_INGRESS_TIMESTAMP=1732269419506, _AMQ_ROUTING_TYPE=1, _AMQ_VALIDATED_USER=user_1, TIMESTAMP_FROM_SERVER=1732269419533]]  
2024-11-22 12:56:59.243 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}::acknowledge ackBytes=431 and ackBatchSize=0, encodeSize=431  
2024-11-22 12:56:59.244 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}:: acknowledge acking ClientMessageImpl[messageID=6442459095, durable=true, address=test-address::queue-1,userID=1d5c461a-a8b8-11ef-b6cb-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_INGRESS_TIMESTAMP=1732269419506, _AMQ_ROUTING_TYPE=1, _AMQ_VALIDATED_USER=user_1, TIMESTAMP_FROM_SERVER=1732269419533]]  
2024-11-22 12:56:59.244 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@4b1d6571{consumerContext=ActiveMQConsumerContext{id=0}, queueName=test-address::queue-1}::Acking message ClientMessageImpl[messageID=6442459095, durable=true, address=test-address::queue-1,userID=1d5c461a-a8b8-11ef-b6cb-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_INGRESS_TIMESTAMP=1732269419506, _AMQ_ROUTING_TYPE=1, _AMQ_VALIDATED_USER=user_1, TIMESTAMP_FROM_SERVER=1732269419533]]  
2024-11-22 12:56:59.244 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - client ack messageID = 6442459095  
2024-11-22 12:56:59.244 [main] ru.sbt.ss.artemis.clients.jms.ExJms1Run INFO  - Консьюмер получил с брокера сообщение: This is a text message  
2024-11-22 12:56:59.256 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=1d3efa19-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=()]@63fbfaeb  
2024-11-22 12:56:59.304 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=1d3efa19-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=()]@63fbfaeb  
2024-11-22 12:56:59.305 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=1d2b4b08-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@64bce832  
2024-11-22 12:56:59.337 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=1d2b4b08-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@64bce832  
2024-11-22 12:56:59.347 [Thread-0 (ActiveMQ-client-global-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Failure captured on connectionID=3f4f1f56, performing failover or reconnection now  
2024-11-22 12:56:59.349 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Session was already closed, giving up now, this=ClientSessionImpl [name=1d3efa19-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=true, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=()]@63fbfaeb  
2024-11-22 12:56:59.350 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Session was already closed, giving up now, this=ClientSessionImpl [name=1d2b4b08-a8b8-11ef-b6cb-aebf3734a116, username=null, closed=true, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@4de4b452, metaData=(jms-session=,jms-client-id=jms-core-client-app-id,)]@64bce832  

Для детализации логирования без понижения уровня до DEBUG или TRACE в клиентском приложении можно использовать логирование в различных обработчиках.

Вот как выглядит запуск примера 2, с дополнительным логированием в обработчиках MessageListener и CompletionListener:

[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - JMS-клиент с синхронной и асинхронной отправкой, обработчики MessageListener и CompletionListener (jms_2)  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Продюсер сессии [ClientID=jms-core-client-app-id] готов асинхронно отправить на брокер ПЕРВОЕ сообщение: ID=null, содержимое=This is a text message 1  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Продюсер сессии [ClientID=jms-core-client-app-id] завершил асинхронную отправку на брокер сообщения: ID=ID:a350e059-a8b8-11ef-a76a-aebf3734a116, содержимое=This is a text message 1  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Продюсер сессии [ClientID=jms-core-client-app-id] готов асинхронно отправить на брокер ВТОРОЕ сообщение: ID=null, содержимое=This is a text message 2  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Продюсер сессии [ClientID=jms-core-client-app-id] завершил асинхронную отправку на брокер сообщения: ID=ID:a3517c9a-a8b8-11ef-a76a-aebf3734a116, содержимое=This is a text message 2  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Небольшая пауза перед стартом соединения (подключением консьюмера)  
[Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.jms.ExJms2Run$1 INFO  - Для сессии [ClientID=jms-core-client-app-id] получено асинхронное уведомление об успешном приеме на брокере сообщения: ID=ID:a350e059-a8b8-11ef-a76a-aebf3734a116  
[Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.jms.ExJms2Run$2 INFO  - Получено уведомление об успешном приеме на брокере сообщения: ActiveMQMessage[ID:a3517c9a-a8b8-11ef-a76a-aebf3734a116]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=test-address::queue-1,userID=a3517c9a-a8b8-11ef-a76a-aebf3734a116,properties=TypedProperties[__AMQ_CID=jms-core-client-app-id, _AMQ_ROUTING_TYPE=1]]  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Подключение к брокеру установлено!  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - А теперь можно поискать в консоли управления брокера (на вкладках Connections, Consumers, Producers) информацию о клиенте по ClientID=jms-core-client-app-id ...  
[Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Консьюмер сессии [ClientID=jms-core-client-app-id] получил асинхронно с брокера сообщение: ID=ID:a350e059-a8b8-11ef-a76a-aebf3734a116, содержимое=This is a text message 1  
[Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Консьюмер сессии [ClientID=jms-core-client-app-id] получил асинхронно с брокера сообщение: ID=ID:a3517c9a-a8b8-11ef-a76a-aebf3734a116, содержимое=This is a text message 2  
[main] ru.sbt.ss.artemis.clients.jms.ExJms2Run INFO  - Все, 30 секунд истекло, наш клиент закрыл сессии и их ClientID исчезли из консоли управления брокера :(  

Примеры готовых классов для демонстрации работы JMS-клиента#

Пример 1. Пример создания и настройки JMS клиента для отправки и чтения текстового сообщения#

ExJms1Run.java#

package ru.sbt.ss.artemis.clients.jms;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class ExJms1Run {
    private static final Logger logger = LoggerFactory.getLogger(ExJms1Run.class);

    // Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
    public final static String URL = "tcp://0.0.0.0:61616";

    // Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
    public static String ADDRESS = "test-address";
    public static String QUEUE = "queue-1";
    public static String FQQN = ADDRESS + "::" + QUEUE;

    public static final String EX_NUMBER = "jms_1";
    public static final String EX_DESCRIPTION = "JMS-клиент с отправкой и вычиткой текстового сообщения";

    public static void main(String[] args) throws Exception {
        logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);

        Connection connection = null;
        try {
            // Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения
            ConnectionFactory cf = new ActiveMQConnectionFactory(URL);

            // Создаем JMS-соединение с брокером
            connection = cf.createConnection();

            // Уникальный идентификатор, который поможет в дальнейшем во время отладки и мониторинга клиентского приложения
            connection.setClientID("jms-core-client-app-id");

            // Создаем JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Поддержка автоматических подтверждений получения сообщений

            // Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере
            Queue queue = ActiveMQJMSClient.createQueue(FQQN);

            // Создаем поставщик (продюсер) сообщений в очередь на брокере
            MessageProducer producer = session.createProducer(queue);

            // Создаем потребитель (консьюмер) сообщений из очереди на брокере
            MessageConsumer messageConsumer = session.createConsumer(queue);

            // Создаем текстовое сообщение для отправки в очередь
            TextMessage message = session.createTextMessage("This is a text message");

            // Отправляем сообщение с помощью продюсера
            producer.send(message);
            logger.info("Продюсер отправил на брокер сообщение: {}", message.getText());

            // Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера
            connection.start();

            // Вычитываем сообщение из брокера с помощью консьюмера
            TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
            logger.info("Консьюмер получил с брокера сообщение: {}", messageReceived.getText());
        } finally {
            if (connection != null) {
                // Закрываем соединение после окончания работы с брокером
                connection.close();
            }
        }
    }

}

ExJms1.java#

package ru.sbt.ss.artemis.clients.jms.demo;

import org.slf4j.Logger;
import ru.sbt.ss.artemis.clients.jms.ExJms1Run;
import ru.sbt.ss.artemis.demo.DemoExample;

public class ExJms1 implements DemoExample {
    public String getNumber() {
        return ExJms1Run.EX_NUMBER;
    }

    @Override
    public String getDescription() {
        return ExJms1Run.EX_DESCRIPTION;
    }

    @Override
    public void run(Logger logger) throws Exception {
        ExJms1Run.main(null);
    }
}

Пример 2. Пример клиента с синхронной и асинхронной отправкой, обработчики MessageListener и CompletionListener#

ExJms2Run.java#

package ru.sbt.ss.artemis.clients.jms;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class ExJms2Run {
    private static final Logger logger = LoggerFactory.getLogger(ExJms2Run.class);

    // Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
    public final static String URL = "tcp://0.0.0.0:61616";

    // Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
    public static String ADDRESS = "test-address";
    public static String QUEUE = "queue-1";
    public static String FQQN = ADDRESS + "::" + QUEUE;

    public static final String EX_NUMBER = "jms_2";
    public static final String EX_DESCRIPTION = "JMS-клиент с синхронной и асинхронной отправкой, обработчики MessageListener и CompletionListener";

    public static void main(String[] args) throws Exception {
        logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);

        Connection connection = null;
        try {
            var setConfirmationWindowSize = ";confirmationWindowSize=1";  // размер окна подтверждения должен быть >= 0, чтобы заработала пересылки уведомлений в обработчик CompletionListener

            // Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения
            ConnectionFactory cf = new ActiveMQConnectionFactory(URL + setConfirmationWindowSize);

            // Создаем JMS-соединение с брокером
            connection = cf.createConnection();

            // Уникальный идентификатор, который поможет в дальнейшем во время отладки и мониторинга клиентского приложения
            connection.setClientID("jms-core-client-app-id");
            var clientId = connection.getClientID();

            // Регистрируем обработчик ошибок подключения
            connection.setExceptionListener(e -> logger.error("Ошибка соединения с брокером: {}", e.getMessage()));

            // Создаем асинхронную JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру
            Session asyncSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // Требуется явное подтверждение факта получения сообщений

            // Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере
            Queue queue = ActiveMQJMSClient.createQueue(FQQN);

            // Создаем продюсер сообщений в очередь с асинхронной отправкой сообщений
            MessageProducer asyncSessionProducer = asyncSession.createProducer(queue);

            // Создаем асинхронный обработчик уведомлений о получении сообщений на брокере
            CompletionListener completionListener= new CompletionListener() {
                @Override
                public void onCompletion(Message message) {
                    try {
                        logger.info("Для сессии [ClientID={}] получено асинхронное уведомление об успешном приеме на брокере сообщения: ID={}", clientId, message.getJMSMessageID());
                    } catch (JMSException e) {
                        throw new RuntimeException(e);
                    }
                }

                @Override
                public void onException(Message message, Exception e) {
                    try {
                        logger.info("Для сессии [ClientID={}] получено асинхронное уведомление о неудачном приеме на брокере сообщения: ID={}", clientId, message.getJMSMessageID());
                    } catch (JMSException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            };

            // Создаем консьюмер сообщений из очереди `test-address::queue-1` на брокере
            MessageConsumer messageConsumer = asyncSession.createConsumer(queue);

            // Регистрируем обработчик асинхронного получения сообщений с брокера
            messageConsumer.setMessageListener(message -> {
                try {
                    var messageReceived = (TextMessage) message;
                    logger.info("Консьюмер сессии [ClientID={}] получил асинхронно с брокера сообщение: ID={}, содержимое={}", clientId, message.getJMSMessageID(), messageReceived.getText());
                    messageReceived.acknowledge(); // Подтверждаем факт успешной вычитки сообщения
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            });

            // Создаем текстовое сообщение №1 для отправки в очередь
            TextMessage message1 = asyncSession.createTextMessage("This is a text message 1");
            // Создаем текстовое сообщение №2 для отправки в очередь
            TextMessage message2 = asyncSession.createTextMessage("This is a text message 2");

            logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер ПЕРВОЕ сообщение: ID={}, содержимое={}", clientId, message1.getJMSMessageID(), message1.getText());
            // Отправляем сообщение с помощью продюсера с обработчиком уведомлений в асинхронном режиме
            asyncSessionProducer.send(message1, completionListener);
            logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", clientId, message1.getJMSMessageID(), message1.getText());

            logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер ВТОРОЕ сообщение: ID={}, содержимое={}", clientId, message2.getJMSMessageID(), message2.getText());
            // Отправляем сообщение с помощью продюсера с кастомным обработчиком уведомлений в асинхронном режиме
            asyncSessionProducer.send(message2, new CompletionListener() {
                @Override
                public void onCompletion(Message message) {
                    logger.info("Получено уведомление об успешном приеме на брокере сообщения: {}", message);
                }
                @Override
                public void onException(Message message, Exception e) {
                    logger.info("Получено уведомление о неудачном приеме на брокере сообщения: {}, {}", message, e.getMessage());
                }
            });
            logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", clientId, message2.getJMSMessageID(), message2.getText());

            // Небольшая пауза перед стартом соединения для начала вычитки сообщений из очереди
            logger.info("Небольшая пауза перед стартом соединения (подключением консьюмера)");
            Thread.sleep(1000);

            // Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера с помощью асинхронного обработчика сообщений
            connection.start();
            logger.info("Подключение к брокеру установлено!");

            logger.info("А теперь можно поискать в консоли управления брокера (на вкладках Connections, Consumers, Producers) информацию о клиенте по ClientID={} ...", clientId);
            // Задержка на время для поиска информации о клиенте в консоли управления брокера
            Thread.sleep(30000);

            logger.info("Все, 30 секунд истекло, наш клиент закрыл сессии и их ClientID исчезли из консоли управления брокера :(");
        } finally {
            if (connection != null) {
                // Закрываем соединение после окончания работы с брокером
                connection.stop();
                connection.close();
            }
        }
    }

}

ExJms2.java#

package ru.sbt.ss.artemis.clients.jms.demo;

import org.slf4j.Logger;
import ru.sbt.ss.artemis.clients.jms.ExJms2Run;
import ru.sbt.ss.artemis.demo.DemoExample;

public class ExJms2 implements DemoExample {
    public String getNumber() {
        return ExJms2Run.EX_NUMBER;
    }

    @Override
    public String getDescription() {
        return ExJms2Run.EX_DESCRIPTION;
    }

    @Override
    public void run(Logger logger) throws Exception {
        ExJms2Run.main(null);
    }
}

Пример 3. Пример работы с сообщениями разных типов#

ExJms3Run.java#

package ru.sbt.ss.artemis.clients.jms;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.time.Instant;

public class ExJms3Run {
    private static final Logger logger = LoggerFactory.getLogger(ExJms3Run.class);

    // Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
    public final static String URL = "tcp://0.0.0.0:61616";

    // Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
    public static String ADDRESS = "test-address";
    public static String QUEUE = "queue-1";
    public static String FQQN = ADDRESS + "::" + QUEUE;

    public static final String EX_NUMBER = "jms_3";
    public static final String EX_DESCRIPTION = "JMS-клиент с примерами работы с сообщениями разных типов (текстовые и бинарные)";

    public static void main(String[] args) throws Exception {
        logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);

        Connection connection = null;
        try {
            var setConfirmationWindowSize = ";confirmationWindowSize=1";  // размер окна подтверждения должен быть >= 0, чтобы заработала пересылки уведомлений в обработчик CompletionListener

            // Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения
            ConnectionFactory cf = new ActiveMQConnectionFactory(URL + setConfirmationWindowSize);

            // Создаем JMS-соединение с брокером
            connection = cf.createConnection();

            // Создаем JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере
            Queue queue = ActiveMQJMSClient.createQueue(FQQN);

            // Создаем продюсер сообщений в очередь с асинхронной отправкой сообщений
            MessageProducer producer = session.createProducer(queue);

            // Создаем консьюмер сообщений из очереди `test-address::queue-1` на брокере
            MessageConsumer messageConsumer = session.createConsumer(queue);

            // Регистрируем обработчик асинхронного получения сообщений с брокера
            messageConsumer.setMessageListener(message -> {
                try {
                    switch (((ActiveMQMessage) message).getType()) {
                        case ActiveMQTextMessage.TYPE: {
                            var textMessage = (TextMessage) message;
                            logger.info("Консьюмер получил с брокера текстовое сообщение: ID={}, содержимое={}", textMessage.getJMSMessageID(), textMessage);
                            break;
                        }
                        case ActiveMQBytesMessage.TYPE: {
                            var bytesMessage = (BytesMessage) message;
                            var bytes = new byte[(int) bytesMessage.getBodyLength()];
                            bytesMessage.readBytes(bytes);
                            try {
                                String strWin1251 = new String(bytes, "Windows-1251");
                                logger.info("Консьюмер получил с брокера бинарное сообщение: ID={}, содержимое={}", bytesMessage.getJMSMessageID(), strWin1251);
                            } catch (UnsupportedEncodingException e) {
                                throw new RuntimeException(e);
                            }
                            break;
                        }
                        default: {
                            logger.info("Консьюмер получил с брокера {} сообщение: ID={}", message.getJMSType(), message.getJMSMessageID());
                        }
                    }
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            });

            // Создаем текстовое UTF-8 сообщение для отправки в очередь
            String utf8Body = "Тело текстового сообщения в кодировке UTF-8 создано " + Instant.now();
            TextMessage messageStrUtf8 = session.createTextMessage(utf8Body);

            // Создаем бинарное сообщение с текстом в кодировке Windows-1251 для отправки в очередь в виде массива байт
            byte[] win1251Body = ("Тело бинарного сообщения - текст в кодировке Windows-1251 создано " + Instant.now()).getBytes("Windows-1251");
            BytesMessage messageStrWin1251 = session.createBytesMessage();
            messageStrWin1251.writeBytes(win1251Body);

            // Отправляем сообщение с помощью продюсера
            producer.send(messageStrUtf8);
            logger.info("Продюсер отправил на брокер сообщение: {}", messageStrUtf8);

            // Отправляем сообщение с помощью продюсера
            producer.send(messageStrWin1251);
            logger.info("Продюсер отправил на брокер сообщение: {}", messageStrWin1251);

            // Небольшая пауза перед стартом соединения для начала вычитки сообщений из очереди
            logger.info("Небольшая пауза перед стартом соединения (подключением консьюмера)");
            Thread.sleep(10000);

            // Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера с помощью асинхронного обработчика сообщений
            connection.start();
            logger.info("Подключение к брокеру установлено!");

            // Небольшая пауза перед завершением работы
            logger.info("Небольшая пауза перед завершением работы");
            Thread.sleep(10000);
        } finally {
            if (connection != null) {
                // Закрываем соединение после окончания работы с брокером
                connection.stop();
                connection.close();
            }
        }
    }

}

ExJms3.java#

package ru.sbt.ss.artemis.clients.jms.demo;

import org.slf4j.Logger;
import ru.sbt.ss.artemis.clients.jms.ExJms3Run;
import ru.sbt.ss.artemis.demo.DemoExample;

public class ExJms3 implements DemoExample {
    public String getNumber() {
        return ExJms3Run.EX_NUMBER;
    }

    @Override
    public String getDescription() {
        return ExJms3Run.EX_DESCRIPTION;
    }

    @Override
    public void run(Logger logger) throws Exception {
        ExJms3Run.main(null);
    }
}

Пример 4. Пример работы с большими сообщениями#

ExJms4Run.java#

package ru.sbt.ss.artemis.clients.jms;

import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.time.Instant;

public class ExJms4Run {
    private static final Logger logger = LoggerFactory.getLogger(ExJms4Run.class);

    // Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
    public final static String URL = "tcp://0.0.0.0:61616";

    // Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
    public static String ADDRESS = "test-address";
    public static String QUEUE = "queue-1";
    public static String FQQN = ADDRESS + "::" + QUEUE;

    public static final String EX_NUMBER = "jms_4";
    public static final String EX_DESCRIPTION = "JMS-клиент с примерами работы с большими сообщениями";

    public static void main(String[] args) throws Exception {
        logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);

        int COMPRESSION_LEVEL = 0;         // Степень сжатия сообщений в пределах от `0` до `9`, по умолчанию используется значение `-1` (это соответствует уровню 6-7).
        int LARGE_MSG_SIZE = 10000;        // Кастомный минимальный размер большого сообщения в байтах
        boolean COMPRESS_LARGE_MSG = true; // Включать сжатие больших сообщений?
        double COMPRESS_FACTOR = !(COMPRESS_LARGE_MSG && COMPRESSION_LEVEL != 0)? 1.1 : 300; // Эмпирический коэффициент увеличения размера исходного сообщения, чтобы после компрессии с уровнем `5` размер был > minLargeMessageSize

        Connection connection = null;
        try {
            var setConfirmationWindowSize = ";confirmationWindowSize=1";                 // Размер окна подтверждения должен быть >= 0, чтобы заработала пересылки уведомлений в обработчик CompletionListener
            var setCompressionLevel = ";compressionLevel=" + COMPRESSION_LEVEL;          // Устанавливаем уровень сжатия сообщений
            var setCompressLargeMessage = ";compressLargeMessage=" + COMPRESS_LARGE_MSG; // Включаем сжатие больших сообщений
            var setCustomMinLargeMessageSize = ";minLargeMessageSize=" + LARGE_MSG_SIZE; // Устанавливаем кастомный минимальный размер большого сообщения в байтах (по умолчанию 100KiB)

            // Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения
            ConnectionFactory cf = new ActiveMQConnectionFactory(URL
                    + setConfirmationWindowSize
                    + setCompressionLevel
                    + setCompressLargeMessage
                    + setCustomMinLargeMessageSize
            );

            // Создаем JMS-соединение с брокером
            connection = cf.createConnection();

            // Создаем JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере
            Queue queue = ActiveMQJMSClient.createQueue(FQQN);

            // Создаем продюсер сообщений в очередь с асинхронной отправкой сообщений
            MessageProducer producer = session.createProducer(queue);

            // Создаем консьюмер сообщений из очереди `test-address::queue-1` на брокере
            MessageConsumer messageConsumer = session.createConsumer(queue);

            // Регистрируем обработчик асинхронного получения сообщений с брокера
            messageConsumer.setMessageListener(message -> {
                try {
                    switch (((ActiveMQMessage) message).getType()) {
                        case ActiveMQTextMessage.TYPE: {
                            var textMessage = (TextMessage) message;
                            logger.info("Консьюмер получил с брокера текстовое сообщение: id={}, сообщение={}, \nтело={}", textMessage.getJMSMessageID(), textMessage, textMessage.getBody(String.class));
                            break;
                        }
                        case ActiveMQBytesMessage.TYPE: {
                            var bytesMessage = (BytesMessage) message;
                            var bytes = new byte[(int) bytesMessage.getBodyLength()];
                            bytesMessage.readBytes(bytes);
                            try {
                                String strWin1251 = new String(bytes, "Windows-1251");
                                logger.info("Консьюмер получил с брокера бинарное сообщение: id={}, сообщение={}, \nтело={}", bytesMessage.getJMSMessageID(), bytesMessage, strWin1251);
                            } catch (UnsupportedEncodingException e) {
                                throw new RuntimeException(e);
                            }
                            break;
                        }
                        default: {
                            logger.info("Консьюмер получил с брокера сообщение: ID={}, MSG={}", message.getJMSMessageID(), message);
                        }
                    }
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            });

            // Создаем БОЛЬШОЕ текстовое UTF-8 сообщение, размером больше, чем задано в настройке minLargeMessageSize
            // Необходимо учитывать, что при включении сжатия (compressLargeMessage=true), фактический размер сообщения станет меньше!!!
            // Также помним, что каждый символ UTF-8 занимает 2 байта в очереди на брокере!!!
            String utf8Text = "Тело текстового большого сообщения в кодировке UTF-8 ";
            String utf8LargeBody = utf8Text.repeat((int) (COMPRESS_FACTOR * LARGE_MSG_SIZE / utf8Text.length() / 2)); // доводим размер сообщения до БОЛЬШОГО (> minLargeMessageSize)
            TextMessage messageStrUtf8 = session.createTextMessage(utf8LargeBody);
            var utf8MessageSize = messageStrUtf8.getBody(String.class).length();
            logger.info("Создано большое текстовое сообщение, тело (до сжатия) размером: {} байт (размер в очереди - {})", utf8MessageSize, utf8MessageSize * 2);

            // Создаем БОЛЬШОЕ бинарное сообщение с текстом в кодировке Windows-1251 для отправки в очередь в виде массива байт
            // Необходимо учитывать, что при включении сжатия (compressLargeMessage=true), фактический размер сообщения станет меньше!!!
            String win1251Text = "Тело бинарного большого сообщения с текстом в кодировке Windows-1251 ";
            String win1251LargeBodyText = win1251Text.repeat((int) (COMPRESS_FACTOR * LARGE_MSG_SIZE / win1251Text.length())); // доводим размер сообщения до БОЛЬШОГО (> minLargeMessageSize)
            byte[] win1251LargeBodyBytes = win1251LargeBodyText.getBytes("Windows-1251");
            BytesMessage messageStrWin1251 = session.createBytesMessage();
            messageStrWin1251.writeBytes(win1251LargeBodyBytes);
            logger.info("Создано большое бинарное сообщение, тело (до сжатия) размером: {} байт", win1251LargeBodyBytes.length);

            // Отправляем сообщение с помощью продюсера
            producer.send(messageStrUtf8);
            logger.info("Продюсер отправил на брокер текстовое сообщение: {}", messageStrUtf8);

            // Отправляем сообщение с помощью продюсера
            producer.send(messageStrWin1251);
            logger.info("Продюсер отправил на брокер бинарное сообщение: {}", messageStrWin1251);

            // Небольшая пауза перед стартом соединения для начала вычитки сообщений из очереди
            // Можно убедиться, что сообщения успешно попали в очередь и имеют признак большого сообщения ("largeMessage": true)
            var delaySec = 10;
            logger.info("Небольшая пауза ({}сек) перед стартом соединения (подключением консьюмера)", delaySec);
            Thread.sleep(delaySec * 1000);

            // Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера с помощью асинхронного обработчика сообщений
            connection.start();
            logger.info("Подключение к брокеру установлено!");

            logger.info("Небольшая пауза ({}сек) перед завершением работы", delaySec);
            Thread.sleep(delaySec * 1000);
        } finally {
            if (connection != null) {
                // Закрываем соединение после окончания работы с брокером
                connection.stop();
                connection.close();
            }
        }
    }

}

ExJms4.java#

package ru.sbt.ss.artemis.clients.jms.demo;

import org.slf4j.Logger;
import ru.sbt.ss.artemis.clients.jms.ExJms4Run;
import ru.sbt.ss.artemis.demo.DemoExample;

public class ExJms4 implements DemoExample {
    public String getNumber() {
        return ExJms4Run.EX_NUMBER;
    }

    @Override
    public String getDescription() {
        return ExJms4Run.EX_DESCRIPTION;
    }

    @Override
    public void run(Logger logger) throws Exception {
        ExJms4Run.main(null);
    }
}

Пример 5. Пример работы с разными режимами коммитов вычитки сообщений#

ExJms5Run.java#

package ru.sbt.ss.artemis.clients.jms;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class ExJms5Run {
    private static final Logger logger = LoggerFactory.getLogger(ExJms5Run.class);

    // Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
    public final static String URL = "tcp://0.0.0.0:61616?ackBatchSize=1500";

    // Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
    public static String ADDRESS = "test-address";
    public static String QUEUE = "queue-1";
    public static String FQQN = ADDRESS + "::" + QUEUE;

    public static final String EX_NUMBER = "jms_5";
    public static final String EX_DESCRIPTION = "JMS-клиент с разными режимами коммитов вычитки сообщений";

    public static void main(String[] args) throws Exception {
        logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);

        Connection connection = null;
        try {
            // Создаем фабрику соединений для выполнения клиентского подключения к брокеру, используя URI с параметрами подключения
            ConnectionFactory cf = new ActiveMQConnectionFactory(URL);

            // Создаем JMS-соединение с брокером
            connection = cf.createConnection();

            // Уникальный идентификатор, который поможет в дальнейшем во время отладки и мониторинга клиентского приложения
            connection.setClientID("jms-core-client-app-id");

            // Создаем JMS-сессию для поддержки работы с потреблением и поставкой событий брокеру
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            // Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            // Session session = connection.createSession(false, ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
            // Session session = connection.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);


            // Создаем объект JMS-очередь для полного имени (FQQN) очереди `test-address::queue-1` на брокере
            Queue queue = ActiveMQJMSClient.createQueue(FQQN);

            // Создаем поставщик (продюсер) сообщений в очередь на брокере
            MessageProducer producer = session.createProducer(queue);

            // Создаем потребитель (консьюмер) сообщений из очереди на брокере
            MessageConsumer messageConsumer = session.createConsumer(queue);

            // Создаем текстовое сообщение для отправки в очередь
            TextMessage message = session.createTextMessage("This is a text message");

            // Отправляем сообщения с помощью продюсера
            for (var i = 1; i <= 10; i++) {
                producer.send(message);
                logger.info("Продюсер отправил на брокер сообщение №{}: {}", i, message.getText());
            }

            // Стартуем соединение, для того чтобы консьюмер смог начать получать сообщения с брокера
            connection.start();

            // Вычитываем сообщения из брокера с помощью консьюмера
            var groupSize = 5;
            for (var i = 1; i <= 10; i++) {
                message = (TextMessage) messageConsumer.receive();
                logger.info("Консьюмер получил с брокера сообщение №{}: {}", i, message.getText());
                if (i % groupSize == 0) {
                    logger.info("Готовимся вызвать acknowledge() для сообщения №{}: {}", i, message.getText());
                    message.acknowledge();
                    logger.info("Завершен вызов acknowledge() для сообщения №{}: {}", i, message.getText());
                }
            }
        } finally {
            if (connection != null) {
                // Закрываем соединение после окончания работы с брокером
                connection.close();
            }
        }
    }

}

ExJms5.java#

package ru.sbt.ss.artemis.clients.jms.demo;

import org.slf4j.Logger;
import ru.sbt.ss.artemis.clients.jms.ExJms5Run;
import ru.sbt.ss.artemis.demo.DemoExample;

public class ExJms5 implements DemoExample {
    public String getNumber() {
        return ExJms5Run.EX_NUMBER;
    }

    @Override
    public String getDescription() {
        return ExJms5Run.EX_DESCRIPTION;
    }

    @Override
    public void run(Logger logger) throws Exception {
        ExJms5Run.main(null);
    }
}