CORE-клиенты#
Протокол Core#
Core-протокол (Artemis CORE protocol) - это Java API, который используется внутренним ядром брокера Artemis, а также доступен на стороне клиентских приложений. Позволяет управлять большинством объектов брокера, реализует полный набор функций обмена сообщениями.
Предусловия использования#
Перед настройкой Core-клиента должны быть выполнены следующие пункты:
запуск брокера Artemis (описано подробнее в документе «Руководство по установке», раздел Установка);
создание адреса и очереди для записи и вычитки сообщений (описано подробнее в документе «Руководство по системному администрированию», раздел Создание адресов и очередей на брокере, выдача прав на них).
Подключение зависимостей#
Для использования клиентской библиотеки core-client на основе artemis core API необходимо подключить в проект зависимость:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${artemis.version}</version>
</dependency>
Описание интерфейсов основных сущностей клиентского Core API#
Основные интерфейсы клиентского Core API#
Интерфейс |
Краткое описание |
|---|---|
ServerLocator |
осуществляет поиск сервера на основе обновляемого списка серверов-членов топологии кластера |
ClientSessionFactory |
точка входа для создания и настройки ресурсов для отправки и получения сообщений |
ClientSession |
однопоточный родительский объект, необходимый для отправки и получения сообщений |
ClientConsumer |
получатель сообщений из очередей брокера |
ClientProducer |
отправитель сообщений на определенный адрес на брокере |
ClientMessage |
сообщение, отправленное или полученное через брокер |
TopologyMember |
предоставляет информацию о брокере участнике топологии кластера |
Интерфейсы слушателей и обработчиков клиентского Core API#
Интерфейс |
Краткое описание |
|---|---|
MessageHandler |
используется для асинхронного получения сообщений |
SendAcknowledgementHandler |
уведомляет, когда сервер подтверждает получение асинхронного сообщения |
ClusterTopologyListener |
уведомляет об изменениях топологии кластера |
FailoverEventListener |
уведомляет об изменениях соединения текущей сессии |
SessionFailureListener |
уведомляет клиента, когда в сессии произошел сбой |
Методы клиентского Core API#
Методы |
Краткое описание |
|---|---|
— ServerLocator — |
|
createServerLocator(String url) |
Создает объект ServerLocator |
— ClientSessionFactory — |
|
createSessionFactory() |
Создает объект ClientSessionFactory |
— ClientSession — |
|
createSession(String username, String password, boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, int ackBatchSize, String clientID) |
Создает сессию для поддержки работы с получением и отправкой событий брокеру |
— ClientConsumer — |
|
createConsumer(String queueName) |
Создает консьюмера в заданной очереди |
start() |
Стартует сессию для того, чтобы консьюмер смог начать получать сообщения с брокера |
receive() |
Вычитка сообщения с брокера в синхронном режиме |
setMessageHandler(MessageHandler handler) |
Регистрирует обработчик асинхронного получения сообщений с брокера |
— СlientProducer — |
|
createProducer(String address) |
Создает продюсера в заданном адресе |
send(String address, Message message) |
Отправляет сообщение в заданный адрес |
setSendAcknowledgementHandler(SendAcknowledgementHandler handler) |
Регистрирует обработчик уведомлений о получении сообщений на брокере |
— ClientMessage — |
|
createMessage(byte type, boolean durable) |
Создает сообщение |
setBodyInputStream(InputStream bodyInputStream) |
Устанавливает поток ввода, который будет использоваться для чтения тела сообщения при его отправке |
setOutputStream(OutputStream out) |
Устанавливает поток вывода, который будет использоваться для записи тела сообщения (неблокирующий метод) |
saveToOutputStream(OutputStream out) |
Используется для сохранения тела сообщения в поток вывода (блокирует выполнение до тех пор, пока все содержимое не будет передано в поток вывода) |
Server locator#
Core-клиенты используют объекты ServerLocator для поиска серверов и создания подключений к ним.
Аналогом ServerLocator в JMS является JMS Connection Factory.
Объекты ServerLocator создаются с использованием фабрики ActiveMQClient, например:
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://0.0.0.0:61616");
ClientSessionFactory#
Core-клиенты используют объекты ClientSessionFactory для подключения к серверу.
Аналогом ClientSessionFactory в JMS является JMS Connections.
Объекты ClientSessionFactory создаются с использованием класса ServerLocator, например:
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession#
Core-клиенты используют объекты ClientSession (сессии) для получения и отправки сообщений, для группировки их в транзакциях.
Объекты ClientSession могут поддерживать как транзакционную, так и нетранзакционную семантику, а также предоставлять интерфейс XAResource, чтобы операции обмена сообщениями могли выполняться в рамках стандартных Java-транзакций Java Transaction API.
Объекты ClientSession создаются с использованием класса ClientSessionFactory. Сессии группируют внутри себя объекты ClientConsumer (консьюмер) и ClientProducer (продюсер), например:
// Создаем сессию для поддержки работы с получением и отправкой событий брокеру
ClientSession session = factory.createSession();
// Создаем поставщик сообщений (продюсер) в адрес `test-address` на брокере
ClientProducer producer = session.createProducer("test-address");
// Создаем потребителель сообщений (консьюмер) из очереди `queue-1` на брокере
ClientConsumer consumer = session.createConsumer("queue-1");
// Создаем объект-сообщение для отправки в очередь
ClientMessage message = session.createMessage(true);
При инициализации объекта сессии можно указать уникальный идентификатор, который поможет в дальнейшем во время отладки и мониторинга клиентского приложения, например:
ClientSession session = factory.createSession(null, null, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize(), "core-client-app-id");
Работу клиентского приложения теперь можно отслеживать в консоли управления брокера по значению Client ID = core-client-app-id во вкладках Connections, Consumers, and Producers.
Пример создания и настройки простого клиента приведен в примере 1 (см. ниже).
Обработчик SendAcknowledgementHandler#
Объекты сессии могут дополнительно регистрировать опциональный обработчик (объект класса, реализующего интерфейс SendAcknowledgementHandler),
через который клиентский код будет получать асинхронные уведомления о факте успешного или неудачного получения отправленных сообщений на стороне брокера, например:
// Устанавливаем размер окна подтверждения в положительное значение, чтобы заработала пересылка уведомлений в обработчик SendAcknowledgementHandler
locator.setConfirmationWindowSize(1000);
// Создаем сессию
ClientSession session = locator.createSessionFactory().createSession();
// Регистрируем обработчик уведомлений о получении сообщений на брокере
session.setSendAcknowledgementHandler(message ->
System.out.println("Получено уведомление об успешном приеме на брокере сообщения: " + message)
);
ClientProducer#
Core-клиенты используют объекты ClientProducer (продюсеры) для отправки сообщений на определенный адрес брокера.
Затем на сервере сообщения уже направляются на очереди, которые связаны с данным адресом.
Для удобства, при создании продюсера можно сразу задать целевой адрес, на который будут маршрутизироваться все отправляемые методом send() сообщения, например:
// Создаем сессию для поддержки работы с потреблением и поставкой событий брокеру
ClientSession session = factory.createSession();
// Создаем поставщик сообщений (продюсер) в адрес `test-address` на брокере
ClientProducer producer = session.createProducer("test-address");
// Отправляем сообщение с помощью продюсера на заранее привязанный к продюсеру адрес `test-address`
producer.send(message);
Возможен также вариант указания требуемого адреса уже на этапе отправки сообщения, например:
// Создаем поставщик сообщений (продюсер)
ClientProducer producer = session.createProducer();
// Отправляем сообщение с помощью продюсера с указанием адреса назначения `test-address`
producer.send("test-address", message);
Специальные методы объекта локатора позволяют управлять темпом отправки сообщений:
// Задает максимальное количество отправленных сообщений, созданных на заданной фабрике
ServerLocator.setProducerMaxRate(int)
// Задает размер окна для ограничения потока продюсеров, создаваемых на заданной фабрике
ServerLocator.setProducerWindowSize(int)
Режимы отправки#
Существует два разных режима отправки сообщений продюсером: синхронный и асинхронный.
Синхронный режим#
Синхронный режим отправки означает, что продюсер будет блокироваться до тех пор, пока сообщение не будет получено брокером и сервер не уведомит клиента об этом.
Включать использование блокирующей семантики отправки можно с помощью следующих методов локатора (или соответствующих им URL-параметров):
ServerLocator.setBlockOnDurableSend(boolean)- отдельно для персистентных (durable) сообщений;ServerLocator.setBlockOnNonDurableSend(boolean)- отдельно для неперсистентных сообщений;
Например:
// Включаем блокировку на отправку сообщений брокеру
locator.setBlockOnDurableSend(true);
locator.setBlockOnNonDurableSend(true);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
// Создаем синхронный продюсер
ClientProducer syncProducer = session.createProducer("test-address");
// Отправляем сообщение с помощью продюсера в синхронном режиме
syncProducer.send(message);
Необходимо помнить, что производительность отправки сообщений в синхронном режиме будет существенно ограничена сетевой задержкой
по транспортировке сообщения и получению обратного подтверждения от сервера Round Trip Time (RTT) и не будет зависеть от пропускной способности сети.
Для того чтобы избежать подобных издержек, необходимо воспользоваться возможностью асинхронного режима отправки, или использовать транзакции с отправкой в пакетном режиме.
Асинхронный режим#
Асинхронный режим отправки означает, что продюсер не будет блокироваться и может продолжать работу, пока сообщение будет доставляться на сервер.
При этом уведомления (acknowledgement) о получении сообщения на брокере будут в ответ пересылаться в отдельном потоке через callback-обработчик SendAcknowledgementHandler клиентского приложения.
Асинхронный режим может быть полезен при работе с большим количеством сообщений, для уменьшения количества запросов к серверу и увеличения производительности.
Для включения асинхронного режима отправки необходимо зарегистрировать клиентский обработчик и установить положительное значение параметра подключения confirmationWindowSize.
Сделать это можно методом setSendAcknowledgementHandler() объекта сессии сразу для всех продюсеров, создаваемых через этот экземпляр сессии.
Можно также передать кастомный обработчик вместе с отправляемым сообщением в методе send() продюсера и выполнить отправку в асинхронном режиме, например:
// Отправляем сообщение с помощью продюсера с кастомным обработчиком в асинхронном режиме
syncSessionProducer.send(message, recievedMessage ->
System.out.println("Получено уведомление об успешном приеме на брокере сообщения: " + recievedMessage)
ClientConsumer#
Core-клиенты используют объекты ClientConsumer (консьюмеры) для вычитки (потребления) сообщений из очереди брокера.
После вычитки сообщения консьюмером происходит его обработка и, когда этот процесс завершается, для каждого сообщения на сервер отправляется подтверждение (ack) его успешного получения.
Как только сообщение подтверждено, оно исчезает из очереди и становится недоступным для повторной доставки.
Если система выйдет из строя до того, как сервер обмена сообщениями получит подтверждение от пользователя, то после восстановления сообщение будет доступно для повторной доставки (redelivery) пользователю.
Core API поддерживает как синхронную, так и асинхронную семантику потребления сообщений.
Потребление в синхронном режиме осуществляется с использованием метода receive(), вызов которого приводит к блокировке клиентского приложения до момента получения сообщения (или истечении тайм-аута ожидания), например:
// Создаем потребителель сообщений (консьюмер) из очереди `queue-1` на брокере
ClientConsumer consumer = session.createConsumer("queue-1");
// Стартуем сессию, для того чтобы консьюмер смог начать получать сообщения с брокера
session.start();
// Вычитываем сообщение из брокера с помощью консьюмера в синхронном режиме
ClientMessage msgReceived = consumer.receive();
Потребление в асинхронном режиме производится при использовании транзакций или путем задания обработчика MessageHandler, например:
// Регистрируем обработчик асинхронного получения сообщений с брокера
consumer.setMessageHandler(message ->
System.out.println("Консьюмер получил с брокера сообщение: " + message.getBodyBuffer().readString())
);
См. ниже пример 2.
ClientMessage#
Core-клиенты могут работать с разными типами сообщений. Любое сообщение состоит из тела и служебных заголовков (набора пар ключ-значение). Тело сообщения является буфером, предоставляющим методы чтения и записи данных.
По способу представления тела сообщения, различают:
строковые сообщения (содержат текст в кодировке UTF-8), например:
// Создаем текстовое UTF-8 сообщение для отправки в очередь
ClientMessage messageStrUtf8 = session.createMessage(ClientMessage.TEXT_TYPE, true);
String utf8Body = "Тело текстового сообщения в кодировке UTF-8";
TextMessageUtil.writeBodyText( ((CoreMessage) messageStrUtf8).getBodyBuffer(), new SimpleString(utf8Body));
бинарные сообщения (содержат произвольный массив байтов, в том числе и текст в кодировке, отличной от UTF-8), например:
// Создаем бинарное сообщение с текстом в кодировке Windows-1251 для отправки в очередь в виде массива байт
ClientMessage messageStrWin1251 = session.createMessage(ClientMessage.BYTES_TYPE, true);
byte[] win1251Body = "Тело бинарного сообщения - текст в кодировке Windows-1251".getBytes("Windows-1251");
messageStrWin1251.getBodyBuffer().writeBytes(win1251Body);
См. ниже пример 3.
По размеру сообщения делятся на:
обычные (regular), если их размер меньше, чем указано в настройке клиентского транспорта
minLargeMessageSize(по умолчанию 100KiB);большие (large), если их размер больше или равен значению
minLargeMessageSize.
Разница между обычными и большими сообщениями заключается в том, что большие сообщения в процессе передачи могут быть разделены на несколько частей (с целью оптимизации). На сервере большие сообщения сохраняются в отдельных файлах и максимальный размер больших сообщений фактически не лимитирован.
Отдельно для больших сообщений можно настраивать сжатие на клиенте в настройке compressLargeMessage.
При этом перед отправкой на сервер сообщения, его тело будет архивироваться по алгоритму сжатия ZIP.
Степень сжатия задается отдельно в настройке compressionLevel в пределах от 0 до 9, по умолчанию используется значение -1 (это соответствует уровню 6-7).
Чем больше уровень сжатия, тем меньше трафик сообщений, но больше время на их сжатие и декомпрессию на клиенте.
Если после сжатия размер сообщения станет меньше, чем minLargeMessageSize, то оно будет отправлено как обычное (regular) сообщение.
Работать с большими сообщения на клиенте можно как обычным способом (через буфер), так и с помощью потоковых методов класса ClientMessage:
setBodyInputStream(InputStream)- устанавливает поток ввода, который будет использоваться для чтения тела сообщения при его отправке;setOutputStream(OutputStream)- устанавливает поток вывода, который будет использоваться для записи тела сообщения (неблокирующий метод);saveOutputStream(OutputStream)- используются для сохранения тела сообщения в поток вывода (блокирует выполнение до тех пор, пока все содержимое не будет передано в поток вывода).
// Формируем тело большого сообщения через потоки
byte[] msgBody = "Тело большого сообщения ".repeat(2500).getBytes(StandardCharsets.UTF_8);
ClientMessage largeMessage = session.createMessage(true);
InputStream inputStream = new ByteArrayInputStream(msgBody);
largeMessage.setBodyInputStream(inputStream);
// Отправляем сообщение на брокер
producer.send(largeMessage);
ClientMessage messageReceived = consumer.receive();
// Получаем тело большого сообщения через потоки
OutputStream outputStream = new ByteArrayOutputStream();
messageReceived.saveToOutputStream(outputStream);
См. ниже пример 4.
Особенности настройки клиентского подключения#
Рассмотрим подробно возможности конфигурирования параметров клиентского подключения к брокеру Артемиса. Передача параметров клиентского подключения к брокеру может быть выполнена разными способами.
Настройка с помощью url#
Необходимые параметры подключения могут быть перечислены в момент создания объекта локатора списком в формате URL-строки вида tcp://host::port?TranspotConfiguration, например:
String url = "tcp://0.0.0.0:61616?protocols=CORE;supportAdvisory=false;suppressInternalManagementObjects=false;tcpReceiveBufferSize=1048576;tcpSendBufferSize=1048576;useEpoll=true;sslEnabled=false;verifyHost=false;needClientAuth=true";
// Создаем экземпляр локатора с помощью URl
ServerLocator locator = ActiveMQClient.createServerLocator(url);
После инициализации объекта локатора можно продолжить конфигурирование параметров подключения через сеттеры локатора.
Однако использовать сеттеры для модификации настроек разрешено только до момента создания экземпляра фабрики ClientSessionFactory,
в противном случае будет выброшено исключение IllegalStateException, например:
String url = "tcp://0.0.0.0:61616?protocols=CORE;supportAdvisory=false;suppressInternalManagementObjects=false;tcpReceiveBufferSize=1048576;tcpSendBufferSize=1048576;useEpoll=true;sslEnabled=false;verifyHost=false;needClientAuth=true";
// Создаем экземпляр локатора с помощью URl
ServerLocator locator = ActiveMQClient.createServerLocator(url);
// Настраиваем дополнительные параметры подключения
locator.setConfirmationWindowSize(1000); // размер окна подтверждения
locator.setMaxRetryInterval(30000); // максимальный интервал между попытками переподключения
locator.setConsumerMaxRate(100); // максимальная скорость потребления сообщений
locator.setProducerMaxRate(200); // максимальная скорость производства сообщений
// Создаем фабрику сессий для подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// После создания инициализации фабрики вызов сеттеров локатора уже не допустим!!!
locator.setGroupID("group-id"); // будет выброшено исключение IllegalStateException: Cannot set attribute on SessionFactory after it has been used
Описание методов конфигурации#
Рассмотрим подробнее методы конфигурации в разрезе по настраиваемым сущностям:
Методы конфигурирования |
Краткое описание |
|---|---|
— СlientProducer — |
|
setAutoGroup(boolean autoGroup) |
Устанавливает, будут ли продюсеры автоматически назначать идентификатор группы отправленным сообщениям |
setBlockOnDurableSend(boolean blockOnDurableSend) |
Устанавливает, будут ли продюсеры блокироваться при отправке персистентных сообщений или будут работать в асинхронном режиме |
setBlockOnNonDurableSend(boolean blockOnNonDurableSend) |
Устанавливает, будут ли продюсеры блокироваться при отправке неперсистентных сообщений или будут работать в асинхронном режиме |
setGroupID(String groupID) |
Задает идентификатор группы, который будет установлен на каждом сообщении |
addOutgoingInterceptor(Interceptor interceptor) |
Добавляет исходящий перехватчик (интерцептор), который будет выполнен до того, как пакеты будут отправлены на сервер |
removeOutgoingInterceptor(Interceptor interceptor) |
Удаляет исходящий перехватчик |
setOutgoingInterceptorList(String interceptorList) |
Задает список исходящих перехватчиков |
setProducerMaxRate(int producerMaxRate) |
Устанавливает ограничение на максимальную скорость производства сообщений (число сообщений в секунду), |
setProducerWindowSize(int producerWindowSize) |
Устанавливает размер окна отправки (буфера сообщений в байтах) для сокращения числа запросов к серверу при работе с большим количеством сообщений |
setCompressionLevel(int compressionLevel) |
Устанавливает уровень сжатия сообщений в пределах от |
setCompressLargeMessage(boolean compressLargeMessages) |
Устанавливает, будет ли использоваться сжатие для больших сообщений |
— СlientConsumer — |
|
setAckBatchSize(int ackBatchSize) |
Устанавливает размер батча с подтверждениями, отправляемыми в пакетном режиме (за один раз) для сокращения числа запросов к серверу |
setBlockOnAcknowledge(boolean blockOnAcknowledge) |
Устанавливает, будут ли консьюмеры блокироваться при отправке подтверждений или будут работать в асинхронном режиме |
setCacheLargeMessagesClient(boolean cached) |
Устанавливает, будут ли большие сообщения, принятые консьюмерами, закешированы во временных файлах или нет |
setConsumerMaxRate(int consumerMaxRate) |
Устанавливает ограничение на максимальную скорость потребления сообщений (число сообщений в секунду), |
setConsumerWindowSize(int consumerWindowSize) |
Устанавливает размер окна получения (буфера предзагрузки сообщений в байтах) для сокращения числа запросов к серверу при работе с большим количеством сообщений |
setConfirmationWindowSize(int confirmationWindowSize) |
Устанавливает размер буфера окна подтверждения в байтах |
addIncomingInterceptor(Interceptor interceptor) |
Добавляет входящий перехватчик (интерцептор), который будет выполнен после того, как будут получены пакеты с сервера |
removeIncomingInterceptor(Interceptor interceptor) |
Удаляет входящий перехватчик |
setIncomingInterceptorList(String interceptorList) |
Задает список входящих перехватчиков |
setPreAcknowledge(boolean preAcknowledge) |
Установливает флаг предварительного подтверждения, который указывает, должны ли сообщения автоматически подтверждаться при получении или следует это делать явно при успешной обработке. Полезно при работе с большим количеством сообщений для увеличения производительности |
— СlientSessionFactory — |
|
setCallTimeout(long callTimeout) |
Устанавливает тайм-аут для операции вызова. Если операция не выполнится в течение этого времени, будет выброшено исключение CallTimeoutException |
setClientFailureCheckPeriod(long clientFailureCheckPeriod) |
Задает период (в мс) периодической проверки (ping) состояния клиента на наличие ошибок или сбоев |
setConnectionLoadBalancingPolicyClassName(String loadBalancingPolicyClassName) |
Устанавливает имя класса политики балансировки сообщений между соединениями |
setConnectionTTL(long connectionTTL) |
Устанавливает времени жизни неактивного соединения |
setCallFailoverTimeout(long callFailoverTimeout) |
Устанавливает тайм-аут для операции вызова при переключении на резервное соединение. Если операция не выполнится в течение этого времени, будет выброшено исключение CallTimeoutException |
setFailoverAttempts(int attempts) |
Устанавливает количество попыток переключения на резервное соединение, которое будет использоваться при возникновении сбоя связи или переключении на резервное соединение |
setInitialMessagePacketSize(int size) |
Устанавливает начальный размер пакета сообщений, который будет использоваться для отправки |
setMinLargeMessageSize(int minLargeMessageSize) |
Устанавливает минимальный размер большого сообщения, который будет использоваться для определения, является ли сообщение большим или нет |
setPasswordCodec(String passwordCodec) |
Устанавливает класс, который будет использоваться для кодирования и декодирования паролей |
setProtocolManagerFactory(ClientProtocolManagerFactory protocolManager) |
Устанавливает фабрику менеджеров протоколов |
setInitialConnectAttempts(int reconnectAttempts) |
Устанавливает количество начальных попыток подключения |
setReconnectAttempts(int reconnectAttempts) |
Устанавливает максимальное количество попыток переподключения, перед ошибкой |
setRetryInterval(long retryInterval) |
Устанавливает интервал повторения, который будет использоваться при повторных операциях |
setRetryIntervalMultiplier(double retryIntervalMultiplier) |
Устанавливает множителлль интервала повторения |
setMaxRetryInterval(long maxRetryInterval) |
Устанавливает максимальный интервал повторения, который будет использоваться при повторных операциях |
setScheduledThreadPoolMaxSize(int scheduledThreadPoolMaxSize) |
Устанавливает максимальный размер пула потоков планировщика |
setThreadPoolMaxSize(int threadPoolMaxSize) |
Устанавливает максимальный размер пула потоков |
setUseGlobalPools(boolean useGlobalPools) |
Устанавливает флаг использования глобальных пулов потоков |
setFlowControlThreadPoolMaxSize(int flowControlThreadPoolMaxSize) |
Устанавливает максимальный размер пула нитей потока управления |
— ClusterTopologyListener — |
|
setUseTopologyForLoadBalancing(boolean useTopologyForLoadBalancing) |
Устанавливает флаг использования топологии для балансировки нагрузки |
Примечания:
в поставку дистрибутива SMBX входит реализация интерцепторов подписи сообщений;
при указании значения размера Window-буфера (setConsumerWindowSize, setConfirmationWindowSize) предполагается, что:
-1позволяет задать неограниченный по размеру буфер для любого количества сообщений (следует с осторожностью использовать только для «быстрых» консьюмеров воизбежании переполнения буфера памяти);0означает, что буфер не используется и сообщения принимаются с сервера/отправляются на сервер немедленно;> 0означает, что буфер имеет заданный максимальный размер в байтах и сообщения прекращают отправляться, когда размер буфера достигает этого значения.
Режимы коммитов#
Для обеспечения гарантии доставки сообщений, брокер требует от клиента подтверждения факта успешности вычитки каждого сообщения. Выполняется это на клиенте путем формирования оповещений (ack) на объектах-сообщениях ClientMessage, с последующей их фиксацией с помощью коммитов на объекте-сессии ClientSession. Как только подтверждение будет получено сервером, сообщение исчезнет из очереди и станет недоступно для повторной доставки. Если же на клиенте произойдет сбой до того, как сервер получит подтверждение, то после восстановления работоспособности клиента сообщение будет доступно для повторной доставки (redelivery). Данный процесс сделан поэтапным с целью возможности гибкого управления минимизации издержек на доставку уведомлений при больших нагрузках.
Существует два режима коммитов:
индивидуальное подтверждение (
individual ack) - производится методомindividualAcknowledge()объекта классаClientMessage. Если для сессии, отвечающей за подтверждение данного сообщения, настроены автоматические коммиты (autoCommitAcks=true), то вызов методаindividualAcknowledge()приведет к фиксации сессии с передачей индивидуального оповещения по данному сообщению на сервер. В противном случае (autoCommitAcks=false), это подтверждение не будет закоммичено до тех пор, пока клиент не зафиксирует текущую сессию явно методомcommit()объекта классаClientSession.подтверждение окном (
acknowledge) - производится методомacknowledge()объекта классаClientMessage. В данном случае при фиксации сессии подтверждения высылаются не отдельно для каждого сообщения, а для набора сообщений, совокупный размер которых укладывается в окно. Размер окна определяется параметромackBatchSizeдля текущей сессии (см. метод setAckBatchSize).
Для Core-клиента имеется возможность настройки вычитки без гарантии доставки сообщений.
Для этого необходимо при создании сессии установить в true значение параметра предварительного подтверждения (preAcknowledge).
В результате на стороне брокера будет автоматически выполняться подтверждение получения сообщения по факту его отправки.
Это позволит избежать дополнительного сетевого трафика и задержек на пересылку-прием уведомлений за счет отказа от гарантии доставки и, как следствия, возможной потери сообщений.
Транзакции#
Транзакции позволяют управлять отправкой или вычиткой произвольного набора сообщений совместно, одновременно фиксируя (commit) или откатывая (rollback) их все.
Транзакции привязаны к объекту-сессии (ClientSession) и распространяются на всех консьюмеров и продюсеров, созданных в рамках этого объекта-сессии.
Включение режима транзакций на сессии#
Транзакции можно включать отдельно на отправку и на вычитку.
Для включения транзакционного режима на отправку сообщений продюсером, необходимо при создании объекта-сессии методом createSession передать параметр autoCommitSends=false.
Для включения транзакционного режима на вычитку сообщений консьюмером, необходимо при создании объекта-сессии методом createSession передать параметр autoCommitAcks=false.
Сделать это явно можно через параметры метода фабрики сессий, например:
ClientSession session = factory.createSession(
null,
null,
false,
false, // autoCommitSends
false, // autoCommitAcks
false, // preAcknowledge
0, // ackBatchSize
"session-client-id"
);
Можно использовать специальный метод фабрики, создающий сессию с включенной по умолчанию транзакционностью и на отправку, и на вычитку сообщений, например:
ClientSession transactSession = factory.createTransactedSession();
См. ниже пример 5.
Примеры логирования#
Штатные логи классов клиентского Core API выводятся в режиме WARN и ERROR.
Например, INFO-логи примера 1 выглядят следующим образом:
[main] ru.sbt.ss.artemis.demo.DemoApp INFO -
=========
Пример : core_1
Описание: CORE-клиент с отправкой и вычиткой текстового сообщения
=========
[main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Продюсер отправил на брокер сообщение: Тело тестового сообщения
[main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Консьюмер получил с брокера сообщение: Тело тестового сообщения
Для того чтобы получить детальную информацию в логах, требуется перевести логирование в режим DEBUG или TRACE.
Вот как уже выглядит запуск примера 1 с переведенным в режим DEBUG логированием классов из пакета org.apache.activemq.artemis.core.client:
=========
Пример : core_1
Описание: CORE-клиент с отправкой и вычиткой текстового сообщения
=========
10:54:02.634 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying reconnection attempt 0/1
10:54:02.634 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying to connect with connectorFactory=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@41c2284a and currentConnectorConfig: TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:54:04.085 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:54:04.085 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Connected with the currentConnectorConfig=TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:54:04.097 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:54:04.110 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Reconnection successful
10:54:04.110 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - ClientSessionFactoryImpl received backup update for primary/backup pair = TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false / null but it didn't belong to TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:54:04.202 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Requesting 32768 credits on address test-address, needed = 32768, arriving = 0
10:54:04.202 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Request 32768 credits on address test-address
10:54:04.239 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - not asking for -1 credits on test-address
10:54:04.239 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - AfterAcquire 114 credits on address test-address, pendingCredits=32654
10:54:04.284 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Продюсер отправил на брокер сообщение: Тело тестового сообщения
10:54:04.294 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Консьюмер получил с брокера сообщение: Тело тестового сообщения
10:54:04.294 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=77cb3267-849a-11ef-b58b-a6e22cca6306, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@6497b078, metaData=()]@59cba5a
10:54:04.311 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=77cb3267-849a-11ef-b58b-a6e22cca6306, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@6497b078, metaData=()]@59cba5a
Ниже приведены логи запуска примера 1 с переведенным в режим TRACE логированием классов из пакета org.apache.activemq.artemis.core.client:
=========
Пример : core_1
Описание: CORE-клиент с отправкой и вычиткой текстового сообщения
=========
10:41:17.116 [main] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@6b927fb CREATE
10:41:17.178 [main] org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl TRACE - Selecting connector from initial connectors.
10:41:17.186 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - getConnectionWithRetry::1 with retryInterval = 2000 multiplier = 1.0
10:41:17.186 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying reconnection attempt 0/1
10:41:17.186 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Trying to connect with connectorFactory=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@f381794 and currentConnectorConfig: TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:41:18.187 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.187 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Connected with the currentConnectorConfig=TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:41:18.191 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.208 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.208 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.209 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.209 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler DEBUG - TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet
10:41:18.205 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - returning RemotingConnectionImpl [ID=17db1e98, clientID=null, nodeID=null, transportConnection=org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection@6057aebb[ID=17db1e98, local= /[0:0:0:0:0:0:0:1%0]:54899, remote=0.0.0.0/[0:0:0:0:0:0:0:1]:41777]]
10:41:18.214 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@6f27a732::Subscribing Topology
10:41:18.215 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - Reconnection successful
10:41:18.215 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl DEBUG - ClientSessionFactoryImpl received backup update for primary/backup pair = TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false / null but it didn't belong to TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false
10:41:18.218 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl TRACE - NodeUp ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=null, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?enabledProtocols=TLSv1-2,TLSv1-3&trustStorePassword=****&tcpReceiveBufferSize=1048576&port=41777&keyStorePassword=****&sslEnabled=true&host=0-0-0-0&verifyHost=false&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&useEpoll=true&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576&needClientAuth=true&amqpCredits=1000&amqpMinLargeMessageSize=102400&supportAdvisory=false&amqpDuplicateDetection=true&amqpLowCredits=300&protocols=CORE,AMQP,MQTT,STOMP&suppressInternalManagementObjects=false], discoveryGroupConfiguration=null]::nodeID=e6f04323-7c09-11ef-9c0a-0a6982745d26, connectorPair=Pair[a=TransportConfiguration(name=3523e6fc-82ed-11ef-95c3-a6e22cca6306, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?needClientAuth=true&enabledProtocols=TLSv1-2,TLSv1-3&amqpCredits=1000&tcpReceiveBufferSize=1048576&keyStorePassword=****&supportAdvisory=false&amqpLowCredits=300&verifyHost=false&useEpoll=true&suppressInternalManagementObjects=false&trustStorePassword=****&port=41777&amqpMinLargeMessageSize=102400&sslEnabled=true&amqpDuplicateDetection=true&host=0-0-0-0&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&protocols=CORE,AMQP,MQTT,STOMP&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576, b=null]
10:41:18.220 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@6b927fb::NewMemberAdd nodeId=e6f04323-7c09-11ef-9c0a-0a6982745d26 member = TopologyMember[id=e6f04323-7c09-11ef-9c0a-0a6982745d26, connector=Pair[a=TransportConfiguration(name=3523e6fc-82ed-11ef-95c3-a6e22cca6306, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory)?needClientAuth=true&enabledProtocols=TLSv1-2,TLSv1-3&amqpCredits=1000&tcpReceiveBufferSize=1048576&keyStorePassword=****&supportAdvisory=false&amqpLowCredits=300&verifyHost=false&useEpoll=true&suppressInternalManagementObjects=false&trustStorePassword=****&port=41777&amqpMinLargeMessageSize=102400&sslEnabled=true&amqpDuplicateDetection=true&host=0-0-0-0&trustStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&protocols=CORE,AMQP,MQTT,STOMP&keyStorePath=/Users/sbt-bakulev-av/App/artemis/installdir/broker/ssl/artemis-vault-keystore-jks&tcpSendBufferSize=1048576, b=null], backupGroupName=null, scaleDownGroupName=null]
10:41:18.221 [Thread-0 (ActiveMQ-client-netty-threads)] org.apache.activemq.artemis.core.client.impl.Topology TRACE - Topology@6b927fb::prepare to send e6f04323-7c09-11ef-9c0a-0a6982745d26 to 0 elements
10:41:18.233 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits 32768 on address test-address, needed=32768, credits=32768, window=32768
10:41:18.234 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits on Address test-address, requesting=32768, arriving=0, balance=0
10:41:18.234 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Requesting 32768 credits on address test-address, needed = 32768, arriving = 0
10:41:18.234 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - Request 32768 credits on address test-address
10:41:18.247 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@1fb669c3{consumerContext=ActiveMQConsumerContext{id=0}, queueName=queue-1}:: being created at
10:41:18.259 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl TRACE - setAddress() Setting default address as test-address
10:41:18.259 [main] org.apache.activemq.artemis.core.client.impl.ClientProducerImpl TRACE - sendRegularMessage::ClientMessageImpl[messageID=0, durable=true, address=test-address,userID=null,properties=TypedProperties[]], Blocking=true
10:41:18.265 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits 114 on address test-address, needed=32768, credits=114, window=32768
10:41:18.265 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl TRACE - CheckCredits did not need it, balance=32768, arriving=0, needed=32768, getbalance + arriving < needed=false
10:41:18.265 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - not asking for -1 credits on test-address
10:41:18.265 [main] org.apache.activemq.artemis.core.client.impl.AbstractProducerCreditsImpl DEBUG - AfterAcquire 114 credits on address test-address, pendingCredits=32654
10:41:18.295 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Продюсер отправил на брокер сообщение: Тело тестового сообщения
10:41:18.295 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@1fb669c3{consumerContext=ActiveMQConsumerContext{id=0}, queueName=queue-1}::receive(0, false)
10:41:18.299 [main] org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl TRACE - org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@1fb669c3{consumerContext=ActiveMQConsumerContext{id=0}, queueName=queue-1}::Returning ClientMessageImpl[messageID=8589936196, durable=true, address=test-address,userID=null,properties=TypedProperties[_AMQ_INGRESS_TIMESTAMP=1728114078268, _AMQ_VALIDATED_USER=broker]]
10:41:18.300 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore1 INFO - Консьюмер получил с брокера сообщение: Тело тестового сообщения
10:41:18.300 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - Calling close on session ClientSessionImpl [name=35245c2d-82ed-11ef-95c3-a6e22cca6306, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@6f27a732, metaData=()]@2d778add
10:41:18.310 [main] org.apache.activemq.artemis.core.client.impl.ClientSessionImpl DEBUG - calling cleanup on ClientSessionImpl [name=35245c2d-82ed-11ef-95c3-a6e22cca6306, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@6f27a732, metaData=()]@2d778add
Для детализации логирования без понижения уровня до DEBUG или TRACE в клиентском приложении можно использовать
логирование в различных обработчиках.
Вот как выглядит запуск примера 2, с дополнительным логированием в обработчиках MessageHandler и SendAcknowledgementHandler:
=========
Пример : core_2
Описание: CORE-клиент синхронная/асинхронная отправка, обработчики MessageHandler и SendAcknowledgementHandler
=========
17:59:52.131 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Продюсер сессии [ClientID=sync-app] готов синхронно отправить на брокер сообщение: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:53.162 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Для сессии [ClientID=sync-app] получено синхронное уведомление об успешном приеме на брокере сообщения: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:53.163 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Продюсер сессии [ClientID=sync-app] готов асинхронно отправить на брокер сообщение: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:53.164 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Продюсер сессии [ClientID=sync-app] завершил асинхронную отправку на брокер сообщения: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:53.164 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Продюсер сессии [ClientID=async-app] готов асинхронно отправить на брокер сообщение: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:53.186 [Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Для сессии [ClientId=sync-app] получено уведомление об успешном приеме на брокере сообщения: ClientMessageImpl[messageID=0, durable=true, address=test-address,userID=null,properties=TypedProperties[]]
17:59:54.165 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Продюсер сессии [ClientID=async-app] завершил асинхронную отправку на брокер сообщения: ID=0, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:54.165 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - А теперь можно поискать в консоли управления брокера (на вкладках Connections, Consumers, Producers) информацию о клиенте по ClientID=sync-app и ClientID=async-app ...
17:59:54.186 [Thread-1 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.core.demo.ExCore2$1 INFO - Для сессии [ClientID=async-app] получено асинхронное уведомление об успешном приеме на брокере сообщения: ID=0
17:59:55.172 [Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.core.demo.ExCore2$2 INFO - Консьюмер сессии [ClientID=async-app] получил асинхронно с брокера сообщение: ID=8589936006, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:55.172 [Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.core.demo.ExCore2$2 INFO - Консьюмер сессии [ClientID=async-app] получил асинхронно с брокера сообщение: ID=8589936007, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
17:59:55.172 [Thread-0 (ActiveMQ-client-global-threads)] ru.sbt.ss.artemis.clients.core.demo.ExCore2$2 INFO - Консьюмер сессии [ClientID=async-app] получил асинхронно с брокера сообщение: ID=8589936008, содержимое=Тело тестового сообщения создано 2024-10-04T14:59:52.130096Z
18:00:24.171 [main] ru.sbt.ss.artemis.clients.core.demo.ExCore2 INFO - Все, 30 секунд истекло, наш клиент закрыл сессии и их ClientID исчезли из консоли управления брокера :(
Примеры готовых классов для демонстрации работы Core-клиента#
Пример 1. Пример создания и настройки простого клиента для отправки и чтения текстового сообщения#
ExCore1Run.java#
package ru.sbt.ss.artemis.clients.core;
import org.apache.activemq.artemis.api.core.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExCore1Run {
private static final Logger logger = LoggerFactory.getLogger(ExCore1Run.class);
// Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
public final static String URL = "tcp://0.0.0.0:61616";
// Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
public static String ADDRESS = "test-address";
public static String QUEUE = "queue-1";
public static String FQQN = ADDRESS + "::" + QUEUE;
public static final String EX_NUMBER = "core_1";
public static final String EX_DESCRIPTION = "CORE-клиент с отправкой и вычиткой текстового сообщения";
public static void main(String[] args) throws Exception {
logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);
// Создаем экземпляр локатора для обнаружения брокера
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
locator.setPreAcknowledge(true); // Поддержка автоматических подтверждений получения сообщений
// Создаем фабрику сессий для выполнения клиентского подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// Создаем сессию для поддержки работы с потреблением и поставкой событий брокеру
ClientSession session = factory.createSession();
// Создаем поставщик сообщений (продюсер) в адрес `test-address` на брокере
ClientProducer producer = session.createProducer(ADDRESS);
// Создаем потребитель сообщений (консьюмер) из очереди `queue-1` на брокере
ClientConsumer consumer = session.createConsumer(QUEUE);
// Создаем текстовое сообщение для отправки в очередь
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Тело тестового сообщения");
// Отправляем сообщение с помощью продюсера
producer.send(message);
logger.info("Продюсер отправил на брокер сообщение: {}", message.getBodyBuffer().readString());
// Стартуем сессию, для того чтобы консьюмер смог начать получать сообщения с брокера
session.start();
// Вычитываем сообщение из брокера с помощью консьюмера
ClientMessage msgReceived = consumer.receive();
logger.info("Консьюмер получил с брокера сообщение: {}", msgReceived.getBodyBuffer().readString());
// Закрываем сессию после окончания работы с брокером
session.close();
}
}
ExCore1.java#
package ru.sbt.ss.artemis.clients.core.demo;
import org.slf4j.Logger;
import ru.sbt.ss.artemis.demo.DemoExample;
import ru.sbt.ss.artemis.clients.core.ExCore1Run;
public class ExCore1 implements DemoExample {
public String getNumber() {
return ExCore1Run.EX_NUMBER;
}
@Override
public String getDescription() {
return ExCore1Run.EX_DESCRIPTION;
}
@Override
public void run(Logger logger) throws Exception {
ExCore1Run.main(null);
}
}
Пример 2. Пример клиента с синхронной и асинхронной отправкой, обработчики MessageHandler и SendAcknowledgementHandler#
ExCore2Run.java#
package ru.sbt.ss.artemis.clients.core;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
public class ExCore2Run {
private static final Logger logger = LoggerFactory.getLogger(ExCore2Run.class);
// Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
public final static String URL = "tcp://0.0.0.0:61616";
// Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
public static String ADDRESS = "test-address";
public static String QUEUE = "queue-1";
public static String FQQN = ADDRESS + "::" + QUEUE;
public static final String EX_NUMBER = "core_2";
public static final String EX_DESCRIPTION = "CORE-клиент с синхронной и асинхронной отправкой, обработчики MessageHandler и SendAcknowledgementHandler";
public static void main(String[] args) throws Exception {
logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);
// Создаем экземпляр локатора для обнаружения брокера
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
// Настраиваем дополнительные параметры подключения к брокеру
locator.setConfirmationWindowSize(1); // размер окна подтверждения должен быть >= 0, чтобы заработала пересылки уведомлений в обработчик SendAcknowledgementHandler
locator.setMaxRetryInterval(30000); // максимальный интервал между попытками переподключения
locator.setConsumerMaxRate(100); // максимальная скорость потребления сообщений
locator.setProducerMaxRate(200); // максимальная скорость производства сообщений
locator.setBlockOnDurableSend(true); // включена блокировка на отправку персистентных сообщений
locator.setBlockOnNonDurableSend(true); // включена блокировка на отправку неперсистентных сообщений
// Создаем фабрику сессий для выполнения клиентского подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// Создаем сессию для поддержки работы с потреблением и поставкой событий брокеру в асинхронном режиме
var asyncSessionId = "async-app"; // ClientId для сессии с асинхронной отправкой и вычиткой сообщений
ClientSession asyncSession = factory.createSession(
null,
null,
false,
true,
true,
locator.isPreAcknowledge(),
locator.getAckBatchSize(),
asyncSessionId
);
// Создаем сессию для поддержки работы с поставкой событий брокеру в синхронном режиме
var syncSessionId = "sync-app"; // ClientId для сессии с синхронной отправкой
ClientSession syncSession = factory.createSession(
null,
null,
false,
true,
true,
locator.isPreAcknowledge(),
locator.getAckBatchSize(),
syncSessionId
);
// Регистрируем асинхронный обработчик уведомлений о получении сообщений на брокере
asyncSession.setSendAcknowledgementHandler(new SendAcknowledgementHandler() {
@Override
public void sendAcknowledged(Message message) {
logger.info("Для сессии [ClientID={}] получено асинхронное уведомление об успешном приеме на брокере сообщения: ID={}", asyncSessionId, message.getMessageID());
}
@Override
public void sendFailed(Message message, Exception e) {
logger.info("Для сессии [ClientID={}] получено асинхронное уведомление о неудачном приеме на брокере сообщения: ID={}", asyncSessionId, message.getMessageID());
SendAcknowledgementHandler.super.sendFailed(message, e);
}
});
// Создаем продюсер для асинхронной сессии
ClientProducer asyncSessionProducer = asyncSession.createProducer();
// Создаем продюсер для синхронной сессии
ClientProducer syncSessionProducer = syncSession.createProducer(ADDRESS);
// Создаем консьюмер для сессии с асинхронной отправкой сообщений
ClientConsumer asyncSessionConsumer = asyncSession.createConsumer(QUEUE);
// Регистрируем обработчик асинхронного получения сообщений с брокера
asyncSessionConsumer.setMessageHandler(new MessageHandler() {
@Override
// Каждый раз, когда консьюмер получает сообщение, будет вызываться метод `onMessage()` обработчика.
public void onMessage(ClientMessage message) {
try {
logger.info("Консьюмер сессии [ClientID={}] получил асинхронно с брокера сообщение: ID={}, содержимое={}", asyncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
message.acknowledge(); // Подтверждаем факт успешной вычитки сообщения
} catch (ActiveMQException e) {
throw new RuntimeException(e);
}
}
@Override
public void onMessageExpired(ClientMessage message) {
logger.info("Консьюмер сессии [ClientID={}] получил асинхронно с брокера просроченное сообщение: ID={}, содержимое={}", asyncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
MessageHandler.super.onMessageExpired(message);
}
});
// Создаем текстовое сообщение для отправки в очередь
ClientMessage message = asyncSession.createMessage(ClientMessage.TEXT_TYPE, true);
TextMessageUtil.writeBodyText( ((CoreMessage) message).getBodyBuffer(), new SimpleString("Тело тестового сообщения создано " + Instant.now()));
logger.info("Продюсер сессии [ClientID={}] готов синхронно отправить на брокер сообщение: ID={}, содержимое={}", syncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
Thread.sleep(5000);
// Отправляем сообщение с помощью продюсера в синхронном режиме
syncSessionProducer.send(message);
logger.info("Для сессии [ClientID={}] получено синхронное уведомление об успешном приеме на брокере сообщения: ID={}, содержимое={}", syncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер сообщение: ID={}, содержимое={}", syncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
// Отправляем сообщение с помощью продюсера с кастомным обработчиком в асинхронном режиме
syncSessionProducer.send(message, recievedMessage -> logger.info("Для сессии [ClientId={}] получено уведомление об успешном приеме на брокере сообщения: {}", syncSessionId, recievedMessage));
logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", syncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер сообщение: ID={}, содержимое={}", asyncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
// Отправляем сообщение с помощью продюсера в синхронном режиме
asyncSessionProducer.send(ADDRESS, message);
logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", asyncSessionId, message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
// Небольшая пауза перед стартом соединения для начала вычитки сообщений из очереди
logger.info("Небольшая пауза перед стартом сессии (подключением консьюмера)");
Thread.sleep(1000);
// Стартуем асинхронную сессию, для того чтобы консьюмер смог начать получать сообщения с брокера
asyncSession.start();
logger.info("Стартовала асинхронная сессия!");
// Стартовать синхронную сессию не обязательно, так как с ней не связан ни один консьюмер
// syncSession.start();
logger.info("А теперь можно поискать в консоли управления брокера (на вкладках Connections, Consumers, Producers) информацию о клиенте по ClientID={} и ClientID={} ...", syncSessionId, asyncSessionId);
// Задержка на время для поиска информации о клиенте в консоли управления брокера
Thread.sleep(30000);
logger.info("Все, 30 секунд истекло, наш клиент закрыл сессии и их ClientID исчезли из консоли управления брокера :(");
// Закрываем сессии после окончания работы с брокером
asyncSession.stop();
asyncSession.close();
syncSession.close();
}
}
ExCore2.java#
package ru.sbt.ss.artemis.clients.core.demo;
import org.slf4j.Logger;
import ru.sbt.ss.artemis.demo.DemoExample;
import ru.sbt.ss.artemis.clients.core.ExCore2Run;
public class ExCore2 implements DemoExample {
public String getNumber() {
return ExCore2Run.EX_NUMBER;
}
@Override
public String getDescription() {
return ExCore2Run.EX_DESCRIPTION;
}
@Override
public void run(Logger logger) throws Exception {
ExCore2Run.main(null);
}
}
Пример 3. Пример работы с сообщениями разных типов#
ExCore3Run.java#
package ru.sbt.ss.artemis.clients.core;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.time.Instant;
public class ExCore3Run {
private static final Logger logger = LoggerFactory.getLogger(ExCore3Run.class);
// Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
public final static String URL = "tcp://0.0.0.0:61616";
// Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
public static String ADDRESS = "test-address";
public static String QUEUE = "queue-1";
public static String FQQN = ADDRESS + "::" + QUEUE;
public static final String EX_NUMBER = "core_3";
public static final String EX_DESCRIPTION = "CORE-клиент с примерами работы с сообщениями разных типов (текстовые и бинарные)";
public static void main(String[] args) throws Exception {
logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);
// Создаем экземпляр локатора для обнаружения брокера
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
locator.setPreAcknowledge(true); // Поддержка автоматических подтверждений получения сообщений
// Создаем фабрику сессий для выполнения клиентского подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// Создаем сессию для поддержки работы с потреблением и поставкой событий брокеру
ClientSession session = factory.createSession();
// Создаем поставщик сообщений (продюсер) в адрес `test-address` на брокере
ClientProducer producer = session.createProducer(ADDRESS);
// Создаем потребителель сообщений (консьюмер) из очереди `queue-1` на брокере
ClientConsumer consumer = session.createConsumer(QUEUE);
// Регистрируем обработчик асинхронного получения сообщений с брокера
consumer.setMessageHandler(message -> {
switch (message.getType()) {
case ClientMessage.TEXT_TYPE: {
logger.info("Консьюмер получил с брокера текстовое сообщение: ID={}, содержимое={}", message.getMessageID(), TextMessageUtil.readBodyText(message.getDataBuffer()));
break;
}
case ClientMessage.BYTES_TYPE: {
var bytes = new byte[message.getBodyBuffer().readableBytes()];
message.getBodyBuffer().readBytes(bytes);
try {
String strWin1251 = new String(bytes, "Windows-1251");
logger.info("Консьюмер получил с брокера бинарное сообщение: ID={}, содержимое={}", message.getMessageID(), strWin1251);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
break;
}
default: {
logger.info("Консьюмер получил с брокера сообщение: ID={}", message.getMessageID());
}
}
});
// Создаем текстовое UTF-8 сообщение для отправки в очередь
ClientMessage messageStrUtf8 = session.createMessage(ClientMessage.TEXT_TYPE, true);
String utf8Body = "Тело текстового сообщения в кодировке UTF-8 создано " + Instant.now();
TextMessageUtil.writeBodyText( ((CoreMessage) messageStrUtf8).getBodyBuffer(), new SimpleString(utf8Body));
// Создаем бинарное сообщение с текстом в кодировке Windows-1251 для отправки в очередь в виде массива байт
ClientMessage messageStrWin1251 = session.createMessage(ClientMessage.BYTES_TYPE, true);
byte[] win1251Body = ("Тело бинарного сообщения - текст в кодировке Windows-1251 создано " + Instant.now()).getBytes("Windows-1251");
messageStrWin1251.getBodyBuffer().writeBytes(win1251Body);
// Отправляем сообщение с помощью продюсера
producer.send(messageStrUtf8);
logger.info("Продюсер отправил на брокер сообщение: {}", TextMessageUtil.readBodyText(messageStrUtf8.getDataBuffer()));
// Отправляем сообщение с помощью продюсера
producer.send(messageStrWin1251);
logger.info("Продюсер отправил на брокер сообщение: {}", messageStrWin1251);
// Стартуем сессию, для того чтобы консьюмер смог начать получать сообщения с брокера
session.start();
// Закрываем сессию после окончания работы с брокером
session.close();
}
}
ExCore3.java#
package ru.sbt.ss.artemis.clients.core.demo;
import org.slf4j.Logger;
import ru.sbt.ss.artemis.demo.DemoExample;
import ru.sbt.ss.artemis.clients.core.ExCore3Run;
public class ExCore3 implements DemoExample {
public String getNumber() {
return ExCore3Run.EX_NUMBER;
}
@Override
public String getDescription() {
return ExCore3Run.EX_DESCRIPTION;
}
@Override
public void run(Logger logger) throws Exception {
ExCore3Run.main(null);
}
}
Пример 4. Пример работы с большими сообщениями#
ExCore4Run.java#
package ru.sbt.ss.artemis.clients.core;
import org.apache.activemq.artemis.api.core.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
public class ExCore4Run {
private static final Logger logger = LoggerFactory.getLogger(ExCore4Run.class);
// Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
public final static String URL = "tcp://0.0.0.0:61616";
// Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
public static String ADDRESS = "test-address";
public static String QUEUE = "queue-1";
public static String FQQN = ADDRESS + "::" + QUEUE;
public static final String EX_NUMBER = "core_4";
public static final String EX_DESCRIPTION = "CORE-клиент с примерами работы с большими сообщениями через Streaming Core API";
public static void main(String[] args) throws Exception {
logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);
int COMPRESSION_LEVEL = 0; // Степень сжатия сообщений в пределах от `0` до `9`, по умолчанию используется значение `-1` (это соответствует уровню 6-7).
int LARGE_MSG_SIZE = 10000; // Кастомный минимальный размер большого сообщения в байтах
boolean COMPRESS_LARGE_MSG = true; // Включать сжатие больших сообщений?
double COMPRESS_FACTOR = !(COMPRESS_LARGE_MSG && COMPRESSION_LEVEL != 0)? 1.1 : 300; // Эмпирический коэффициент увеличения размера исходного сообщения, чтобы после компрессии с уровнем `5` размер был > minLargeMessageSize
// Создаем экземпляр локатора для обнаружения брокера
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
locator.setPreAcknowledge(true); // Поддержка автоматических подтверждений получения сообщений
locator.setCompressionLevel(COMPRESSION_LEVEL); // Уровень сжатия сообщений
locator.setCompressLargeMessage(COMPRESS_LARGE_MSG); // Включаем сжатие больших сообщений
locator.setMinLargeMessageSize(LARGE_MSG_SIZE); // Кастомный минимальный размер большого сообщения
// Создаем фабрику сессий для выполнения клиентского подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// Создаем сессию для поддержки работы с потреблением и поставкой событий брокеру
ClientSession session = factory.createSession();
// Создаем поставщик сообщений (продюсер) в адрес `test-address` на брокере
ClientProducer producer = session.createProducer(ADDRESS);
// Создаем потребителель сообщений (консьюмер) из очереди `queue-1` на брокере
ClientConsumer consumer = session.createConsumer(QUEUE);
// Формируем тело первого большого сообщения через потоки (Streaming Core API)
String utf8Text = "Тело большого сообщения № 1 с текстом в кодировке UTF-8 ";
String utf8TextLargeBodyText = utf8Text.repeat((int) (COMPRESS_FACTOR * LARGE_MSG_SIZE / utf8Text.length())); // доводим размер сообщения до БОЛЬШОГО (> minLargeMessageSize)
byte[] msgBody1 = utf8TextLargeBodyText.getBytes(StandardCharsets.UTF_8);
ClientMessage largeMessageWithStream = session.createMessage(true);
ByteArrayInputStream inputStream = new ByteArrayInputStream(msgBody1);
largeMessageWithStream.setBodyInputStream(inputStream);
logger.info("Создано большое сообщение №1 с текстом в кодировке UTF-8, тело (поток) размером: {} байт", largeMessageWithStream.getBodySize());
// Формируем тело второго большого сообщения через буфер
String win1251Text = "Тело большого сообщения №2 с текстом в кодировке Windows-1251 ";
String win1251LargeBodyText = win1251Text.repeat((int) (COMPRESS_FACTOR * LARGE_MSG_SIZE / win1251Text.length())); // доводим размер сообщения до БОЛЬШОГО (> minLargeMessageSize)
byte[] win1251LargeBodyBytes = win1251LargeBodyText.getBytes("Windows-1251");
ClientMessage largeMessageWithBuffer = session.createMessage(true);
largeMessageWithBuffer.getBodyBuffer().writeBytes(win1251LargeBodyBytes);
logger.info("Создано большое сообщение №2 с текстом в кодировке Windows-1251, тело (буфер) размером: {} байт", largeMessageWithBuffer.getBodySize());
// Отправляем оба сообщения на брокер
producer.send(largeMessageWithStream);
logger.info("Продюсер отправил на брокер большое сообщение №1: {}", largeMessageWithStream);
producer.send(largeMessageWithBuffer);
logger.info("Продюсер отправил на брокер большое сообщение №2: {}", largeMessageWithBuffer);
// Небольшая пауза перед стартом соединения для начала вычитки сообщений из очереди
// Можно убедиться, что сообщения успешно попали в очередь и имеют признак большого сообщения ("largeMessage": true)
var delaySec = 10;
logger.info("Небольшая пауза ({}сек) перед стартом сессии (подключением консьюмера)", delaySec);
Thread.sleep(delaySec * 1000);
// Стартуем сессию, для того чтобы консьюмер смог начать получать сообщения с брокера
session.start();
// Вычитываем сообщение №1 из брокера с помощью консьюмера
ClientMessage messageReceived1 = consumer.receive();
// Получаем тело большого сообщения через потоки
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
messageReceived1.saveToOutputStream(outputStream);
var text1 = outputStream.toString(StandardCharsets.UTF_8);
logger.info("Консьюмер получил с брокера большое сообщение с текстом в кодировке UTF-8({}байт): {}", text1.length(), text1);
// Вычитываем сообщение №2 из брокера с помощью консьюмера
ClientMessage messageReceived2 = consumer.receive();
// Получаем тело большого сообщения через буфер
var bytes = new byte[messageReceived2.getBodyBuffer().readableBytes()];
messageReceived2.getBodyBuffer().readBytes(bytes);
var text2 = new String(bytes, "Windows-1251");
logger.info("Консьюмер получил с брокера большое сообщение с текстом в кодировке Windows-1251({}байт): {}", text2.length(), text2);
logger.info("Небольшая пауза ({}сек) перед завершением работы", delaySec);
Thread.sleep(delaySec * 1000);
// Закрываем сессию после окончания работы с брокером
session.close();
}
}
ExCore4.java#
package ru.sbt.ss.artemis.clients.core.demo;
import org.slf4j.Logger;
import ru.sbt.ss.artemis.demo.DemoExample;
import ru.sbt.ss.artemis.clients.core.ExCore4Run;
public class ExCore4 implements DemoExample {
public String getNumber() {
return ExCore4Run.EX_NUMBER;
}
@Override
public String getDescription() {
return ExCore4Run.EX_DESCRIPTION;
}
@Override
public void run(Logger logger) throws Exception {
ExCore4Run.main(null);
}
}
Пример 5. Пример работы с транзакциями#
ExCore5Run.java#
package ru.sbt.ss.artemis.clients.core;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
public class ExCore5Run {
private static final Logger logger = LoggerFactory.getLogger(ExCore5Run.class);
// Предполагается, что брокер Artemis запущен на локальной машине на открытом порту (plain TCP)
public final static String URL = "tcp://0.0.0.0:61616";
// Предполагается, что на брокере предварительно должен быть создан адрес `test-address` с очередью `queue-1`
public static String ADDRESS = "test-address";
public static String QUEUE = "queue-1";
public static String FQQN = ADDRESS + "::" + QUEUE;
public static final String EX_NUMBER = "core_5";
public static final String EX_DESCRIPTION = "CORE-клиент с транзакциями";
public static void main(String[] args) throws Exception {
logger.info("{} ({})", EX_DESCRIPTION, EX_NUMBER);
// Создаем экземпляр локатора для обнаружения брокера
ServerLocator locator = ActiveMQClient.createServerLocator(URL);
// Настраиваем дополнительные параметры подключения к брокеру
locator.setAckBatchSize(1); // Устанавливает размер буфера для подтверждений
// Создаем фабрику сессий для выполнения клиентского подключения к брокеру
ClientSessionFactory factory = locator.createSessionFactory();
// Создаем транзакционную сессию для поддержки работы с потреблением и поставкой событий брокеру
// ClientSession implicitTransactSession = factory.createTransactedSession(); // сессия с транзакцией
var transactSessionId = "transact-app"; // ClientId для сессии с транзакцией
ClientSession transactSession = factory.createSession(
null,
null,
false,
false, // autoCommitSends
false, // autoCommitAcks
locator.isPreAcknowledge(), // preAcknowledge
locator.getAckBatchSize(), //ackBatchSize
transactSessionId
);
// Создаем продюсер для транзакционной сессии
ClientProducer producer = transactSession.createProducer(ADDRESS);
// Создаем консьюмер для транзакционной сессии
ClientConsumer consumer = transactSession.createConsumer(QUEUE);
// Создаем текстовые сообщения для отправки в очередь
ClientMessage message1 = transactSession.createMessage(ClientMessage.TEXT_TYPE, true);
TextMessageUtil.writeBodyText( ((CoreMessage) message1).getBodyBuffer(), new SimpleString("Тело тестового сообщения №1 создано " + Instant.now()));
ClientMessage message2 = transactSession.createMessage(ClientMessage.TEXT_TYPE, true);
TextMessageUtil.writeBodyText( ((CoreMessage) message2).getBodyBuffer(), new SimpleString("Тело тестового сообщения №2 создано " + Instant.now()));
ClientMessage message3 = transactSession.createMessage(ClientMessage.TEXT_TYPE, true);
TextMessageUtil.writeBodyText( ((CoreMessage) message3).getBodyBuffer(), new SimpleString("Тело тестового сообщения №3 создано " + Instant.now()));
// Стартуем транзакционную сессию
transactSession.start();
// Отправляем сообщения
logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер сообщение: ID={}, содержимое={}", transactSessionId, message1.getMessageID(), TextMessageUtil.readBodyText(message1.getDataBuffer()));
// Отправляем сообщение №1 с помощью продюсера в асинхронном режиме
producer.send(message1);
logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", transactSessionId, message1.getMessageID(), TextMessageUtil.readBodyText(message1.getDataBuffer()));
logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер сообщение: ID={}, содержимое={}", transactSessionId, message2.getMessageID(), TextMessageUtil.readBodyText(message2.getDataBuffer()));
// Отправляем сообщение №2 с помощью продюсера в асинхронном режиме
producer.send(message2);
logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", transactSessionId, message2.getMessageID(), TextMessageUtil.readBodyText(message2.getDataBuffer()));
logger.info("Продюсер сессии [ClientID={}] готов асинхронно отправить на брокер сообщение: ID={}, содержимое={}", transactSessionId, message3.getMessageID(), TextMessageUtil.readBodyText(message3.getDataBuffer()));
// Отправляем сообщение №3 с помощью продюсера в асинхронном режиме
producer.send(message3);
logger.info("Продюсер сессии [ClientID={}] завершил асинхронную отправку на брокер сообщения: ID={}, содержимое={}", transactSessionId, message3.getMessageID(), TextMessageUtil.readBodyText(message3.getDataBuffer()));
// Коммитим транзакцию (отправку сообщений)
logger.info("Коммитим транзакцию (отправку сообщений)");
transactSession.commit();
logger.info("Закоммичена транзакция (отправка сообщений)");
// Вычитываем сообщение №1
logger.info("Консьюмер начал вычитку");
ClientMessage msgReceived = consumer.receive();
logger.info("Консьюмер получил с брокера сообщение: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Подтверждаем вычитку сообщения №1
msgReceived.acknowledge();
logger.info("Подтверждено (ack) получение сообщения: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Коммитим транзакцию (получение сообщения №1 было подтверждено)
logger.info("Коммитим транзакцию (получение сообщения №1)");
transactSession.commit();
logger.info("Закоммичена транзакция (получение сообщения №1)");
// Вычитываем сообщение №2
logger.info("Консьюмер начал вычитку");
msgReceived = consumer.receive();
logger.info("Консьюмер получил с брокера сообщение: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Подтверждаем вычитку сообщения №2
msgReceived.acknowledge();
logger.info("Подтверждено (ack) получение сообщения: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Откатываем сессию, что приведет к переотправке сообщений с брокера
logger.info("Откатываем транзакцию");
transactSession.rollback();
logger.info("Завершено откатывание транзакции");
// Вычитываем сообщение №2 заново
logger.info("Консьюмер начал вычитку");
msgReceived = consumer.receive();
logger.info("Консьюмер получил заново (redelivering) после отката транзакции сообщение: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
msgReceived.acknowledge();
logger.info("Подтверждено (ack) получение сообщения: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Вычитываем сообщение №3
logger.info("Консьюмер начал вычитку");
msgReceived = consumer.receive();
logger.info("Консьюмер получил после отката транзакции сообщение: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
msgReceived.acknowledge();
logger.info("Подтверждено (ack) получение сообщения: ID={}, содержимое={}", msgReceived.getMessageID(), TextMessageUtil.readBodyText(msgReceived.getDataBuffer()));
// Коммитим транзакцию (получение сообщений №2 и №3 было подтверждено)
logger.info("Коммитим транзакцию (повторное получение сообщений №2 и №3)");
transactSession.commit();
logger.info("Закоммичена транзакция (повторное получение сообщений №2 и №3)");
logger.info("А теперь можно поискать в консоли управления брокера (на вкладках Connections, Consumers, Producers) информацию о клиенте по ClientID={} ...", transactSessionId);
// Задержка на время для поиска информации о клиенте в консоли управления брокера
Thread.sleep(30000);
logger.info("Все, 30 секунд истекло, наш клиент закрыл транзакционную сессию и ее ClientID исчез из консоли управления брокера :(");
// Закрываем сессии после окончания работы с брокером
transactSession.stop();
transactSession.close();
}
}
ExCore5.java#
package ru.sbt.ss.artemis.clients.core.demo;
import org.slf4j.Logger;
import ru.sbt.ss.artemis.demo.DemoExample;
import ru.sbt.ss.artemis.clients.core.ExCore5Run;
public class ExCore5 implements DemoExample {
public String getNumber() {
return ExCore5Run.EX_NUMBER;
}
@Override
public String getDescription() {
return ExCore5Run.EX_DESCRIPTION;
}
@Override
public void run(Logger logger) throws Exception {
ExCore5Run.main(null);
}
}