Kafka-clients с подключением к нескольким кластерам Kafka#

Producer#

Класс ru.sbt.ss.kafka.clients.producer.KafkaProducer реализует стандартный интерфейс: org.apache.kafka.clients.producer.Producer.

Дополнительные параметры конфигурации находятся в классе ru.sbt.ss.kafka.clients.producer.ExtendedProducerConfig.

Внутри используется несколько kafka-producers в зависимости от конфигурации. Вызовы методов клиента делегируются producers в зависимости от выбранной стратегии.

  1. Стратегия Fallback:

  • сообщения отправляются первым активным producer из списка до получения ошибки записи;

  • если активный producer получает ошибку записи — клиент переключается на использование следующего активного producer в списке, а текущий producer становится неактивным;

  • спустя таймаут, настраиваемый с помощью параметра retry.delay (по умолчанию: 60000 мс) producer восстановится и клиент снова переключится на него;

  • в качестве проверки соединения с кластером kafka используется метод partitionsFor() — получение списка партиций для topic. Topic можно задать параметром healthcheck.topic (по умолчанию системный topic: _consumer_offsets);

  • если все producers получили ошибку записи и не успели восстановиться — будет выбрасывается исключение «All clients failed exception.» пока любой из клиентов не восстановится.

  1. Стратегия Balance-load:

  • сообщения отправляются активными producers по алгоритму round-robin;

  • если какой-либо из producer получает ошибку записи — этот producer становится неактивным, а сообщения распределяются между остальными активными producers;

  • спустя таймаут, настраиваемый с помощью параметра retry.delay (по умолчанию: 60000 мс) producer восстановится и снова включится в список балансировки;

  • в качестве проверки соединения с кластером kafka используется метод partitionsFor() — получение списка партиций для topic. Topic можно задать параметром ExtendedProducerConfig.HEALTH_CHECK_TOPIC_CONFIG (по умолчанию системный topic _consumer_offsets);

  • если все producers получили ошибку записи и не успели восстановиться — будет выбрасывается исключение «All clients failed exception.» пока любой из клиентов не восстановится;

  • балансировку сообщений между клиентами можно настроить параметрами .messages.per.cycle.

Для управления внутренними producers используется система акторов akka.

Дополнительная конфигурация producer#

Клиент поддерживает все доступные параметры стандартного kafka-producer, а также следующие дополнительные параметры (которые также описаны в классе ru.sbt.ss.kafka.clients.producer.ExtendedProducerConfig):

## Список имен используемых producers
# Для каждого клиента можно задать отдельные параметры, если в префиксе параметра указать его имя, например:
# client1.bootstrap.servers = localhost:9093
clients = producer1, producer2

## Стратегия распределения сообщений между producers (fallback или balance-load)
strategy = fallback

## Настройка балансировки сообщений между producers при использовании стратегии balance-load
# По умолчанию messages.per.cycle=1 для всех клиентов — равномерное распределние сообщений по алгоритму round-robin.
# Например для 75%/25% балансировки можно отправлять 3 сообщения первому producer и 1 сообщение второму producer за одну итерацию.
# producer1.messages.per.cycle = 3
# producer2.messages.per.cycle = 1

## Задержка в мс, после которой неактивный producer (после ошибки записи) предпримет попытку восстановления
retry.delay = 60000

## Использовать healthcheck (получения списка партциий методом partitionsFor()) при попытке восстаноления producer.
# Если вызов вернет ошибку — клиент не восстановится и предпримет следущую попытку восстанвления через интервал retry.delay
healthcheck.on.retry.enabled = true

## Настройка topic для healthcheck (получения списка партциий методом partitionsFor())
healthcheck.topic = _consumer_offsets

## Настройка количества попыток переотправки сообщения разными продьсерами, по умолчанию равно количество producers.
send.retries = 2

## Максимальное количество сообщений, которое могут одновременно находиться в обработке клиентом.
# Используется для ограничения используемой памяти, аналогично настройке producer buffer.memory.
# При превышении лимита запись (вызов producer.send()) заблокируется, пока не освободится место в очереди (queue.unlock.threshold) или не истечет таймаут entrypoint.max.block.ms
max.queue.size = 15000

## Коэффициент заполненности очереди сообщений max.queue.size, при котором разблокируется запись, если она ранее была заблокирована из-за превышения max.queue.size
# Для очереди по умолчанию 15000 * 0.75 — запись разблокируется когда в очереди останется 11250 сообщений
queue.unlock.threshold = 0.75

## Аналогично стандартной натройке max.block.ms — если запись заблокирована более этого таймаута — будет выброшено исключение.
# По умолчанию сумма стандартных параметров max.block.ms и delivery.timeout всех producers.
entrypoint.max.block.ms = 360000

## Путь до файла с конфигурацией akka
# Пример настройки с значениями по умолчанию находится в ресурсах приложения `resources/producer-reference.conf`
akka.conf = path/to/akka.conf

## Имя системы акторов akka (среды исполнения)
# Несколько клиентов могут ипользовать одну среду исполнения akka по имени.
actor.system.name = producer-actor-system

## Таймаут остановки среды исполнения akka в мс, если она не используется ни одним клиентом
actor.system.shutdown.delay.ms = 1800000

## Таймаут завершения producer.
shutdown.timeout.ms = 5000

Пример использования#

package ru.sbt.ss.kafka.clients.producer;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import ru.sbt.ss.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Map;

// Пример использования producer с подключением к двум кластерам и стратегией fallback
public class HighAvailabilityProducerExample {
	public static void main(String[] args) {
		Map<String, Object> configuration = Map.of(
				// Стратегия использования клиентов
				ExtendedProducerConfig.STRATEGY_CONFIG, ExtendedProducerConfig.FALLBACK_STRATEGY,
				// Список имен используемых клиентов
				ExtendedProducerConfig.CLIENTS_CONFIG, "client1, client2",

				// Общие параметры для всех клиентов, стандартные настройки
				ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(),
				ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName(),

				// Отдельные настройки для клиента с именем client1
				"client1." + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cluster-1:9092",
				// Отдельные настройки для клиента с именем client2
				"client2." + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cluster-2:9092"

				// Отдельные настройки для клиентов можно вынести в отдельный файл
				// В файлах настройки указываются без префикса с именем клиента
//				"client1." + ExtendedProducerConfig.CLIENT_CONFIGURATION_CONFIG, "path/to/client-1.properties",
//				"client2." + ExtendedProducerConfig.CLIENT_CONFIGURATION_CONFIG, "path/to/client-2.properties"
		);

		// producer используется c с помощью стандартного интерфейса org.apache.kafka.clients.producer.Producer
		Producer<String, String> producer = new KafkaProducer<>(configuration);
		ProducerRecord<String, String> record = new ProducerRecord<>("topic", "value");

        // Сообщение отправится первым активным producer
		producer.send(record).get();
	}
}

Список методов клиента#

/**
* Отправить сообщение с помощью одного из активных producer в зависимости от выбранной стратегии.
* В ответе RecordMetadata в имя topic будет добавлен префикс с именем клиента, отправившего сообщение.
* Порядок отправки сообщений может быть нарушен при переключении внутренних producers.
*/
send(record: ProducerRecord[K, V]): Future[RecordMetadata]

/**
* Отправить сообщение с помощью одного из активных producers в зависимости от выбранной стратегии.
* В ответе RecordMetadata в имя topic будет добавлен префикс с именем клиента, отправившего сообщение.
* Порядок отправки сообщений может быть нарушен при переключении внутренних producers.
*/
send(record: ProducerRecord[K, V], callback: Callback): Future[RecordMetadata]

/**
* Заблокировать поток до тех пор, пока все сообщения не будут отправлены.
*
* В отличии от оригинального метода:
* 1. Дождется отправки даже тех сообщений, которые были отправлены после вызова метода в другом потоке.
* 2. Не вызывает оригинальный метод flush() на всех producers, т.е. producers дождутся linger.ms перед отправкой пачек.
*/
flush(): Unit

/**
* Инициализировать транзакционный режим на всех producers. 
*/
initTransactions(): Unit

/**
* Открыть новую транзакцию на активном пproducer. Замена producers в транзакционном режиме происходит только во время исполнения этого метода.
*/
beginTransaction(): Unit

/**
* Подтвердить (commit) текущую транзакцию.
*
* При получении ошибки активный producer не меняется - можно попробовать commit еще раз или вызвать метод abortTransaction()
*
* Также возможно вызвать метод beginTransaction(), в таком случае новая транзакця инициализируется на новом producer, текущая транзакция будет потеряна.
*/
commitTransaction(): Unit = producers.commitTransaction()

/**
* Откатить текущую транзакцию.
*
* При получении ошибки активный producer не меняется - можно попробовать откатить транщакцию еще раз.
* Также возможно вызвать метод beginTransaction(), в таком случае новая транзакця инициализируется на новом пproducer, текущая транзакция будет потеряна.
*/
abortTransaction(): Unit

/**
* Закрыть producer
*/
close(): Unit

/**
* Закрыть producer
*/
close(timeout: Duration): Unit

Нереализованные методы#

/**
* Получить список партиций
*/
partitionsFor(topic: String): util.List[PartitionInfo]

/**
* Получить метрики клиента
*/ 
metrics(): util.Map[MetricName, _ <: Metric]

/**
* Добавить прочитанные оффсеты в текущую транзакцию
*/ 
sendOffsetsToTransaction(offsets: util.Map[TopicPartition, OffsetAndMetadata], consumerGroupId: String): Unit

/**
* Добавить прочитанные оффсеты в текущую транзакцию
*/ 
override def sendOffsetsToTransaction(offsets: util.Map[TopicPartition, OffsetAndMetadata], groupMetadata: ConsumerGroupMetadata): Unit