Kafka-clients с подключением к нескольким кластерам Kafka#
Producer#
Класс ru.sbt.ss.kafka.clients.producer.KafkaProducer реализует стандартный интерфейс: org.apache.kafka.clients.producer.Producer.
Дополнительные параметры конфигурации находятся в классе ru.sbt.ss.kafka.clients.producer.ExtendedProducerConfig.
Внутри используется несколько kafka-producers в зависимости от конфигурации. Вызовы методов клиента делегируются producers в зависимости от выбранной стратегии.
Стратегия
Fallback:
сообщения отправляются первым активным producer из списка до получения ошибки записи;
если активный producer получает ошибку записи — клиент переключается на использование следующего активного producer в списке, а текущий producer становится неактивным;
спустя таймаут, настраиваемый с помощью параметра
retry.delay(по умолчанию: 60000 мс) producer восстановится и клиент снова переключится на него;в качестве проверки соединения с кластером kafka используется метод
partitionsFor()— получение списка партиций для topic. Topic можно задать параметромhealthcheck.topic(по умолчанию системный topic:_consumer_offsets);если все producers получили ошибку записи и не успели восстановиться — будет выбрасывается исключение «All clients failed exception.» пока любой из клиентов не восстановится.
Стратегия
Balance-load:
сообщения отправляются активными producers по алгоритму
round-robin;если какой-либо из producer получает ошибку записи — этот producer становится неактивным, а сообщения распределяются между остальными активными producers;
спустя таймаут, настраиваемый с помощью параметра
retry.delay(по умолчанию: 60000 мс) producer восстановится и снова включится в список балансировки;в качестве проверки соединения с кластером kafka используется метод
partitionsFor()— получение списка партиций для topic. Topic можно задать параметромExtendedProducerConfig.HEALTH_CHECK_TOPIC_CONFIG(по умолчанию системный topic_consumer_offsets);если все producers получили ошибку записи и не успели восстановиться — будет выбрасывается исключение «All clients failed exception.» пока любой из клиентов не восстановится;
балансировку сообщений между клиентами можно настроить параметрами
.messages.per.cycle.
Для управления внутренними producers используется система акторов akka.
Дополнительная конфигурация producer#
Клиент поддерживает все доступные параметры стандартного kafka-producer, а также следующие дополнительные параметры (которые также описаны в классе ru.sbt.ss.kafka.clients.producer.ExtendedProducerConfig):
## Список имен используемых producers
# Для каждого клиента можно задать отдельные параметры, если в префиксе параметра указать его имя, например:
# client1.bootstrap.servers = localhost:9093
clients = producer1, producer2
## Стратегия распределения сообщений между producers (fallback или balance-load)
strategy = fallback
## Настройка балансировки сообщений между producers при использовании стратегии balance-load
# По умолчанию messages.per.cycle=1 для всех клиентов — равномерное распределние сообщений по алгоритму round-robin.
# Например для 75%/25% балансировки можно отправлять 3 сообщения первому producer и 1 сообщение второму producer за одну итерацию.
# producer1.messages.per.cycle = 3
# producer2.messages.per.cycle = 1
## Задержка в мс, после которой неактивный producer (после ошибки записи) предпримет попытку восстановления
retry.delay = 60000
## Использовать healthcheck (получения списка партциий методом partitionsFor()) при попытке восстаноления producer.
# Если вызов вернет ошибку — клиент не восстановится и предпримет следущую попытку восстанвления через интервал retry.delay
healthcheck.on.retry.enabled = true
## Настройка topic для healthcheck (получения списка партциий методом partitionsFor())
healthcheck.topic = _consumer_offsets
## Настройка количества попыток переотправки сообщения разными продьсерами, по умолчанию равно количество producers.
send.retries = 2
## Максимальное количество сообщений, которое могут одновременно находиться в обработке клиентом.
# Используется для ограничения используемой памяти, аналогично настройке producer buffer.memory.
# При превышении лимита запись (вызов producer.send()) заблокируется, пока не освободится место в очереди (queue.unlock.threshold) или не истечет таймаут entrypoint.max.block.ms
max.queue.size = 15000
## Коэффициент заполненности очереди сообщений max.queue.size, при котором разблокируется запись, если она ранее была заблокирована из-за превышения max.queue.size
# Для очереди по умолчанию 15000 * 0.75 — запись разблокируется когда в очереди останется 11250 сообщений
queue.unlock.threshold = 0.75
## Аналогично стандартной натройке max.block.ms — если запись заблокирована более этого таймаута — будет выброшено исключение.
# По умолчанию сумма стандартных параметров max.block.ms и delivery.timeout всех producers.
entrypoint.max.block.ms = 360000
## Путь до файла с конфигурацией akka
# Пример настройки с значениями по умолчанию находится в ресурсах приложения `resources/producer-reference.conf`
akka.conf = path/to/akka.conf
## Имя системы акторов akka (среды исполнения)
# Несколько клиентов могут ипользовать одну среду исполнения akka по имени.
actor.system.name = producer-actor-system
## Таймаут остановки среды исполнения akka в мс, если она не используется ни одним клиентом
actor.system.shutdown.delay.ms = 1800000
## Таймаут завершения producer.
shutdown.timeout.ms = 5000
Пример использования#
package ru.sbt.ss.kafka.clients.producer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import ru.sbt.ss.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
// Пример использования producer с подключением к двум кластерам и стратегией fallback
public class HighAvailabilityProducerExample {
public static void main(String[] args) {
Map<String, Object> configuration = Map.of(
// Стратегия использования клиентов
ExtendedProducerConfig.STRATEGY_CONFIG, ExtendedProducerConfig.FALLBACK_STRATEGY,
// Список имен используемых клиентов
ExtendedProducerConfig.CLIENTS_CONFIG, "client1, client2",
// Общие параметры для всех клиентов, стандартные настройки
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(),
// Отдельные настройки для клиента с именем client1
"client1." + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cluster-1:9092",
// Отдельные настройки для клиента с именем client2
"client2." + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cluster-2:9092"
// Отдельные настройки для клиентов можно вынести в отдельный файл
// В файлах настройки указываются без префикса с именем клиента
// "client1." + ExtendedProducerConfig.CLIENT_CONFIGURATION_CONFIG, "path/to/client-1.properties",
// "client2." + ExtendedProducerConfig.CLIENT_CONFIGURATION_CONFIG, "path/to/client-2.properties"
);
// producer используется c с помощью стандартного интерфейса org.apache.kafka.clients.producer.Producer
Producer<String, String> producer = new KafkaProducer<>(configuration);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "value");
// Сообщение отправится первым активным producer
producer.send(record).get();
}
}
Список методов клиента#
/**
* Отправить сообщение с помощью одного из активных producer в зависимости от выбранной стратегии.
* В ответе RecordMetadata в имя topic будет добавлен префикс с именем клиента, отправившего сообщение.
* Порядок отправки сообщений может быть нарушен при переключении внутренних producers.
*/
send(record: ProducerRecord[K, V]): Future[RecordMetadata]
/**
* Отправить сообщение с помощью одного из активных producers в зависимости от выбранной стратегии.
* В ответе RecordMetadata в имя topic будет добавлен префикс с именем клиента, отправившего сообщение.
* Порядок отправки сообщений может быть нарушен при переключении внутренних producers.
*/
send(record: ProducerRecord[K, V], callback: Callback): Future[RecordMetadata]
/**
* Заблокировать поток до тех пор, пока все сообщения не будут отправлены.
*
* В отличии от оригинального метода:
* 1. Дождется отправки даже тех сообщений, которые были отправлены после вызова метода в другом потоке.
* 2. Не вызывает оригинальный метод flush() на всех producers, т.е. producers дождутся linger.ms перед отправкой пачек.
*/
flush(): Unit
/**
* Инициализировать транзакционный режим на всех producers.
*/
initTransactions(): Unit
/**
* Открыть новую транзакцию на активном пproducer. Замена producers в транзакционном режиме происходит только во время исполнения этого метода.
*/
beginTransaction(): Unit
/**
* Подтвердить (commit) текущую транзакцию.
*
* При получении ошибки активный producer не меняется - можно попробовать commit еще раз или вызвать метод abortTransaction()
*
* Также возможно вызвать метод beginTransaction(), в таком случае новая транзакця инициализируется на новом producer, текущая транзакция будет потеряна.
*/
commitTransaction(): Unit = producers.commitTransaction()
/**
* Откатить текущую транзакцию.
*
* При получении ошибки активный producer не меняется - можно попробовать откатить транщакцию еще раз.
* Также возможно вызвать метод beginTransaction(), в таком случае новая транзакця инициализируется на новом пproducer, текущая транзакция будет потеряна.
*/
abortTransaction(): Unit
/**
* Закрыть producer
*/
close(): Unit
/**
* Закрыть producer
*/
close(timeout: Duration): Unit
Нереализованные методы#
/**
* Получить список партиций
*/
partitionsFor(topic: String): util.List[PartitionInfo]
/**
* Получить метрики клиента
*/
metrics(): util.Map[MetricName, _ <: Metric]
/**
* Добавить прочитанные оффсеты в текущую транзакцию
*/
sendOffsetsToTransaction(offsets: util.Map[TopicPartition, OffsetAndMetadata], consumerGroupId: String): Unit
/**
* Добавить прочитанные оффсеты в текущую транзакцию
*/
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition, OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit