Подписка на события#
Для передачи событий через компонент EVTD необходимо реализовать отправку событий и их получение на стороне клиентов: producer на стороне отправителя и consumer на стороне получателя.
Подписка на события – процесс получения сообщений из определенного topic определенного кластера Platform V Corax / Apache Kafka.
Предусловия:
Подготовить клиентский сертификат для подключения приложения к Platform V Corax / Apache Kafka.
Определить кластер, куда будут публиковаться события, к которым необходимо получить доступ.
Определить наименование topic, в который публикуются события, к которым необходимо получить доступ.
Запросить у администратора доступа выдачу прав на подписку к указанному topic для полученного сертификата.
Шаги разработки:
Примеры кода приводятся для языка Java.
Подключить в зависимости проекта библиотеку
org.apache.kafka:kafka-clients:2.4.0.В классе произвести заполнение параметров подключения:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.1:9093,192.168.1.2:9093,192.168.1.3:9093");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.identification.algorithm", "");
, где:
bootstrap.servers— список серверов кластера Platform V Corax / Apache Kafkaс указанием портов через запятую;group.id— наименования группы потребителя, все потребители в рамках одной группы считаются одним потребителем;enable.auto.commit— установка в true означает, что обработанные offsets коммитятся автоматически, интервал commit устанавливается в параметреauto.commit.interval.ms;key.deserializerиvalue.deserializer— класс-десериализатор ключа и значения, полученных в ConsumerRecords. В комплекте с клиентом идут два стандартных класса: ByteArrayDeserializer и StringDeserializer.
Добавить подключение к кластеру:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "your-topic"));
, где:
props— предзаполненные параметры подключения;my-topic,your-topic— наименование topics, к которым производится подключение.
Добавить непосредственно вычитку:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}