Подписка на события#

Для передачи событий через компонент EVTD необходимо реализовать отправку событий и их получение на стороне клиентов: producer на стороне отправителя и consumer на стороне получателя.

Подписка на события – процесс получения сообщений из определенного topic определенного кластера Platform V Corax / Apache Kafka.

Предусловия:

  1. Подготовить клиентский сертификат для подключения приложения к Platform V Corax / Apache Kafka.

  2. Определить кластер, куда будут публиковаться события, к которым необходимо получить доступ.

  3. Определить наименование topic, в который публикуются события, к которым необходимо получить доступ.

  4. Запросить у администратора доступа выдачу прав на подписку к указанному topic для полученного сертификата.

Шаги разработки:

Примеры кода приводятся для языка Java.

  1. Подключить в зависимости проекта библиотеку org.apache.kafka:kafka-clients:2.4.0.

  2. В классе произвести заполнение параметров подключения:

Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.identification.algorithm", "");

, где:

  • bootstrap.servers — список серверов кластера Platform V Corax / Apache Kafkaс указанием портов через запятую;

  • group.id — наименования группы потребителя, все потребители в рамках одной группы считаются одним потребителем;

  • enable.auto.commit — установка в true означает, что обработанные offsets коммитятся автоматически, интервал commit устанавливается в параметре auto.commit.interval.ms;

  • key.deserializer и value.deserializer— класс-десериализатор ключа и значения, полученных в ConsumerRecords. В комплекте с клиентом идут два стандартных класса: ByteArrayDeserializer и StringDeserializer.

  1. Добавить подключение к кластеру:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "your-topic"));

, где:

  • props — предзаполненные параметры подключения;

  • my-topic, your-topic — наименование topics, к которым производится подключение.

  1. Добавить непосредственно вычитку:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }