Примеры использования KafkaProducer#
Ниже приведен простой пример использования производителя для отсылки записей со строками, содержащими последовательности чисел в качестве пар «ключ-значение»:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
Производитель состоит из пула буферного пространства, содержащего в себе записи, которые еще не были переданы на сервер, а также из дополнительного потока I/O, ответственного за конвертацию этих записей в запросы и их передачу в кластер.
Внимание
Ошибка выполнения метода
producer.close()после использования производителя приведет к утечке ресурсов.
Метод send() — асинхронный. Он добавляет запись в буфер неотправленных записей и сразу возвращается. Это позволяет производителю собирать отдельные записи в одном месте, что повышает его эффективность.
Конфигурационный параметр acks контролирует критерии, по которым определяется завершенность запросов. Настройка all означает блокировку при полной фиксации записи. Это самая медленная, но наиболее надежная настройка.
Если запрос окажется неудачным, производитель может повторить его. Однако в примере параметр retries равен 0, и запросы повторяться не будут.
Примечание
Включение параметра
retriesможет привести к задвоению данных.
Производитель обрабатывает буферы необработанных записей для каждой партиции. Размер этих буферов определяется конфигурационным параметром batch.size. Увеличение размера буферов может увеличить количество записей, которое будет в них содержаться, однако это потребует больше памяти, поскольку на каждую активную партицию приходится по одному из таких буферов.
По умолчанию буфер доступен для отправки сразу, даже если он заполнен не до конца. Однако если нужно сократить количество запросов, можно установить параметр linger.ms в значение больше 0. Этот параметр заставит производителя ждать дополнительных записей в течение заданного количества миллисекунд. Это похоже на алгоритм Нейгла для протокола TCP. Например, в блоке кода выше 100 записей были бы отосланы одновременно, так как параметр linger.ms равен 1 мс. Однако это добавило бы задержку в 1 мс к ожиданию запросом дополнительных записей при не до конца заполненном буфере.
Внимание
Записи, появляющиеся близко друг к другу по времени, будут записываться в один и тот же пакет даже при
linger.ms=0, и при высокой нагрузке запись в пакет будет осуществляться независимо от значения параметраlinger.ms. Однако изменение значения данного параметра в большую сторону может привести к менее частым и более эффективным запросам при невысокой нагрузке за счет малой задержки.
Параметр buffer.memory контролирует количество памяти, доступное производителю для буферизации. Если скорость отсылки записей выше скорости передачи их на сервер, это приведет к переполнению буферного пространства. При переполнении буфера дальнейшие вызовы отправки будут заблокированы. Пороговое значение для времени блокировки определяется параметром max.block.ms. По истечении этого времени выдается исключение TimeoutException.
Параметры key.serializer и value.serializer отвечают за сериализацию объектов «ключ-значение», предоставляемых пользователем через ProducerRecord.
Примечание
Для простых строковых или байтовых типов можно использовать
ByteArraySerializerиStringSerializer.
Начиная с версии Corax 5.272.0, KafkaProducer поддерживает два дополнительных режима:
идемпотентный производитель — усиливает семантику доставки сообщений с
at least onceдоexactly once. Это означает, что повторы запросов производителем не будут вызывать задвоения данных;транзакционный производитель — позволяет приложению отсылать сообщения в несколько партиций и топиков атомарно.
Для включения идемпотентности необходимо установить значение true для параметра enable.idempotence. В этом случае параметр retries примет значение Integer.MAX_VALUE, а параметр acks примет значение all.
Примечания
Идемпотентность производителя не изменяет API, таким образом, имеющиеся приложения также не требуют изменений для корректной работы функции.
Для корректной работы идемпотентного режима необходимо избегать повторных отправок на уровне приложений, поскольку задвоение отменить невозможно. Для этого необходимо оставить параметр
retriesбез значения; оно автоматически станет равнымInteger.MAX_VALUE.Если метод
send(ProducerRecord)возвращает ошибку даже при бесконечном значении параметраretries(например, при сгорании сообщения в буфере до отправки), рекомендуется отключить производитель и проверить содержимое последнего созданного сообщения на предмет задвоения.Производитель может гарантировать идемпотентность только для сообщений, отправленных в одной сессии.
Чтобы использовать транзакционный производитель и сопутствующие API:
Установите значение параметра
transactional.id. Когда этот параметр будет задан, идемпотентность включится автоматически вместе с конфигурационными параметрами производителя, от которых зависит идемпотентность.Для повышения надежности сконфигурируйте топики, включенные в транзакции. Для этого значение параметра
replication.factorдолжно быть не меньше3, а значение параметраmin.insync.replicasдля таких топиков должно быть равно2.Для полной реализации гарантий транзакционности настройте потребители на чтение только завершенных сообщений.
Цель transactional.id — обеспечить возможность восстановления транзакций в нескольких сессиях одного экземпляра производителя. Обычно он происходит из идентификатора сегмента в партиционированном приложении с базой данных.
Все новые транзакционные API блокируются и при ошибке выдают исключения. Пример ниже показывает, каким образом можно использовать новые API. Он похож на пример выше. Различие состоит в том, что все 100 сообщений являются частью одной транзакции:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
Как показано на примере, на один производитель может приходиться лишь одна открытая транзакция. Все сообщения, отсылаемые между методами beginTransaction() и commitTransaction(), будут частью одной транзакции. Когда параметр transactional.id установлен, все сообщения, отосланные производителем, должны быть частью транзакции.
Транзакционный производитель использует исключения для сообщения ошибок. Таким образом, передавать функцию обратного вызова для метода producer.send() или вызывать метод .get() на возвращенном Future не требуется: KafkaException появится, если любой из вызовов метода producer.send() или транзакционных вызовов завершится неисправимой ошибкой во время транзакции.
При вызове producer.abortTransaction() при получении KafkaException можно убедиться в том, что все успешно завершенные записи будут помечены как отмененные, сохраняя таким образом гарантии транзакционности.