Руководство прикладного разработчика#
Системные требования#
Зависят от выбранного языка реализации клиентской библиотеки.
Подключение и конфигурирование#
Перед разработкой необходимо подключить к проекту необходимые зависимости:
Для Java:
org.apache.kafka:kafka-clients:2.4.0
Для Python: kafka-python
Для C/C++: librdkafka
Для Go(GoLang): sarama
Для .Net: confluent-kafka-dotnet
Миграция на текущую версию#
Общий подход#
Сохраняется совместимость с предыдущими версиями.
Разработка первого приложения с использованием программного продукта#
Для передачи событий через Kafka необходимо реализовать отправку и получение на стороне клиентов: продюсер на стороне отправителя и консьюмер на стороне получателя.
Публикация событий#
Публикация событий - процесс отправки сообщений в определенный топик определенного кластера Apache Kafka.
Пререквизиты:
Подготовить клиентский сертификат для подключения приложения к Apache Kafka.
Определить кластер Apache Kafka, в который будет производиться запись.
Определить наименование топика, в который будут отправляться события в соответствии с правилом наименования: <индентификатор системы-поставщика>.<тип события>EVENT.V<версия формата события>
AS1.ENTITYCHANGEDEVENT.V1
Запросить у администратора доступа создание топика с получившимся наименованием на кластере и выдачу на него прав на запись для полученного сертификата.
Шаги разработки:
Примеры кода приводятся для языка Java.
Подключить в зависимости проекта библиотеку
"org.apache.kafka:kafka-clients:2.4.0".В классе произвести заполнение параметров подключения:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.dentification.algorithm", "");
где
bootstrap.servers- список серверов кластера Apache Kafka с указанием портов через запятую;acks- необходимое количество ответов от брокеров Apache Kafka, для того чтобы считать запись успешной.Возможные значения:
0 - ответ не ожидается; 1 - достаточно ответа от брокера-лидера; all - ожидается ответ от всех реплик что они получили сообщение.batch.size- размер пачки в байтах, в которые будут группироваться события в случае отправки нескольких событий.linger.ms- количество миллисекунд, которое producer ждет получения возможных новых событий для пачки перед отправкой.buffer.memory- общее количество памяти выделяем для буферизации отправляемых событий.key.serializerиvalue.serializer- классы-сериализатор ключа и значения переданных в ProducerRecord. В комплекте с клиентом идут два стандартных класса: ByteArraySerializer и StringSerializer.
Добавить непосредственно отправку события:
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String,String> record = new ProducerRecord<String,String>("my-topic", key, value);
producer.send(record);
producer.close();
где
props- предзаполненные параметры подключения;my-topic- наименование топика в который производится отправка;
key - ключ события;
value - текст события.
Непосредственно отправка возможна одним из трех вариантов:
Блокирующий синхронный, с ожиданием ответа об успешной отправке:
producer.send(record).get();Асинхронный без ожидания ответа:
producer.send(record)Асинхронный с ожиданием ответа:
producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } });
Подписка на события#
Подписка на события - процесс получения сообщений из определенного топика определенного кластера Apache Kafka.
Пререквизиты:
Подготовить клиентский сертификат для подключения приложения к Apache Kafka.
Определить кластер Apache Kafka, в который публикуются события, к которым необходимо получить доступ.
Определить наименование топика, в который публикуются события, к которым необходимо получить доступ.
Запросить у администратора доступа выдачу прав на подписку к указанному топику для полученного сертификата.
Шаги разработки:
Примеры кода приводятся для языка Java.
Подключить в зависимости проекта библиотеку
org.apache.kafka:kafka-clients:2.4.0.В классе произвести заполнение параметров подключения:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.dentification.algorithm", "");
где
bootstrap.servers- список серверов кластера Apache Kafka с указанием портов через запятую;group.id- наименования группы потребителя, все потребители в рамках одной группы считаются одним потребителем;enable.auto.commit- установка в true означает что обработанные оффсеты коммитятся автоматически, интервал коммита устанавливается в параметреauto.commit.interval.ms;key.deserializerиvalue.deserializer- класс-десериализатор ключа и значения полученных в ConsumerRecords. В комплекте с клиентом идут два стандартных класса: ByteArrayDeserializer и StringDeserializer.
Добавить подключение к кластеру:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "your-topic"));
где
props- предзаполненные параметры подключения;my-topic,your-topic- наименование топиков к которым производится подключение.
Добавить непосредственно вычитку:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Использование программного продукта#
Основной функционал продукта - предоставление транспортного слоя и возможностей отправки и получения событий.
Дополнительно предоставляется функционал классов-перехватчиков, которые отрабатывают перед передачей управления в основной код, и входят в стандартные механизмы подключения к Apache Kafka.
Общей задачей классов-перехватчиков служит обработка каждой исходящей/входящей записи на уровне транспорта, до передачи ее в прикладной код. Таким образом осуществляется независимая от прикладного кода функциональность транспортного слоя. Вмешательство в работу перехватчика со стороны прикладного кода, если перехватчик загружен в клиента kafka, невозможно. Только переинициализация клиента с выгрузкой перехватчика.
Часто встречающиеся проблемы и пути их устранения#
Не выявлено.