Руководство прикладного разработчика#

Системные требования#

Зависят от выбранного языка реализации клиентской библиотеки.

Подключение и конфигурирование#

Перед разработкой необходимо подключить к проекту необходимые зависимости:

Для Java: org.apache.kafka:kafka-clients:2.4.0

Для Python: kafka-python

Для C/C++: librdkafka

Для Go(GoLang): sarama

Для .Net: confluent-kafka-dotnet

Миграция на текущую версию#

Общий подход#

Сохраняется совместимость с предыдущими версиями.

Разработка первого приложения с использованием программного продукта#

Для передачи событий через Kafka необходимо реализовать отправку и получение на стороне клиентов: продюсер на стороне отправителя и консьюмер на стороне получателя.

Публикация событий#

Публикация событий - процесс отправки сообщений в определенный топик определенного кластера Apache Kafka.

Пререквизиты:

  1. Подготовить клиентский сертификат для подключения приложения к Apache Kafka.

  2. Определить кластер Apache Kafka, в который будет производиться запись.

  3. Определить наименование топика, в который будут отправляться события в соответствии с правилом наименования: <индентификатор системы-поставщика>.<тип события>EVENT.V<версия формата события>

AS1.ENTITYCHANGEDEVENT.V1

  1. Запросить у администратора доступа создание топика с получившимся наименованием на кластере и выдачу на него прав на запись для полученного сертификата.

Шаги разработки:

Примеры кода приводятся для языка Java.

  1. Подключить в зависимости проекта библиотеку "org.apache.kafka:kafka-clients:2.4.0".

  2. В классе произвести заполнение параметров подключения:

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.dentification.algorithm", "");

где

  • bootstrap.servers - список серверов кластера Apache Kafka с указанием портов через запятую;

  • acks - необходимое количество ответов от брокеров Apache Kafka, для того чтобы считать запись успешной.

    Возможные значения:

    0 - ответ не ожидается;
    
    1 - достаточно ответа от брокера-лидера;
    
    all - ожидается ответ от всех реплик что они получили сообщение.
    
  • batch.size - размер пачки в байтах, в которые будут группироваться события в случае отправки нескольких событий.

  • linger.ms - количество миллисекунд, которое producer ждет получения возможных новых событий для пачки перед отправкой.

  • buffer.memory - общее количество памяти выделяем для буферизации отправляемых событий.

  • key.serializer и value.serializer- классы-сериализатор ключа и значения переданных в ProducerRecord. В комплекте с клиентом идут два стандартных класса: ByteArraySerializer и StringSerializer.

  1. Добавить непосредственно отправку события:

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String,String> record = new ProducerRecord<String,String>("my-topic", key, value); 
producer.send(record);
producer.close();

где

  • props - предзаполненные параметры подключения;

  • my-topic - наименование топика в который производится отправка;

key - ключ события;

value - текст события.

Непосредственно отправка возможна одним из трех вариантов:

  • Блокирующий синхронный, с ожиданием ответа об успешной отправке:
    producer.send(record).get();

  • Асинхронный без ожидания ответа:
    producer.send(record)

  • Асинхронный с ожиданием ответа:
    producer.send(record,               new Callback() {                  public void onCompletion(RecordMetadata metadata, Exception e) {                      if(e != null) {                         e.printStackTrace();                      } else {                         System.out.println("The offset of the record we just sent is: " + metadata.offset());                      }                  }              });

Подписка на события#

Подписка на события - процесс получения сообщений из определенного топика определенного кластера Apache Kafka.

Пререквизиты:

  1. Подготовить клиентский сертификат для подключения приложения к Apache Kafka.

  2. Определить кластер Apache Kafka, в который публикуются события, к которым необходимо получить доступ.

  3. Определить наименование топика, в который публикуются события, к которым необходимо получить доступ.

  4. Запросить у администратора доступа выдачу прав на подписку к указанному топику для полученного сертификата.

Шаги разработки:

Примеры кода приводятся для языка Java.

  1. Подключить в зависимости проекта библиотеку org.apache.kafka:kafka-clients:2.4.0.

  2. В классе произвести заполнение параметров подключения:

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.dentification.algorithm", "");

где

  • bootstrap.servers - список серверов кластера Apache Kafka с указанием портов через запятую;

  • group.id - наименования группы потребителя, все потребители в рамках одной группы считаются одним потребителем;

  • enable.auto.commit - установка в true означает что обработанные оффсеты коммитятся автоматически, интервал коммита устанавливается в параметре auto.commit.interval.ms;

  • key.deserializer и value.deserializer- класс-десериализатор ключа и значения полученных в ConsumerRecords. В комплекте с клиентом идут два стандартных класса: ByteArrayDeserializer и StringDeserializer.

  1. Добавить подключение к кластеру:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "your-topic"));

где

  • props - предзаполненные параметры подключения;

  • my-topic, your-topic - наименование топиков к которым производится подключение.

  1. Добавить непосредственно вычитку:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }

Использование программного продукта#

Основной функционал продукта - предоставление транспортного слоя и возможностей отправки и получения событий.

Дополнительно предоставляется функционал классов-перехватчиков, которые отрабатывают перед передачей управления в основной код, и входят в стандартные механизмы подключения к Apache Kafka.

Общей задачей классов-перехватчиков служит обработка каждой исходящей/входящей записи на уровне транспорта, до передачи ее в прикладной код. Таким образом осуществляется независимая от прикладного кода функциональность транспортного слоя. Вмешательство в работу перехватчика со стороны прикладного кода, если перехватчик загружен в клиента kafka, невозможно. Только переинициализация клиента с выгрузкой перехватчика.

Часто встречающиеся проблемы и пути их устранения#

Не выявлено.