Публикация событий#
Для передачи событий через компонент EVTD необходимо реализовать отправку событий и их получение на стороне клиентов: producer на стороне отправителя и consumer на стороне получателя.
Публикация событий – процесс отправки сообщений в определенный топик определенного кластера Platform V Corax / Apache Kafka.
Для этого необходимо:
подготовить клиентский сертификат для подключения приложения к Platform V Corax / Apache Kafka;
определить кластер, в который будет производиться запись;
определить наименование топик, в который будут отправляться события в соответствии с правилом наименования:
<индентификатор системы-поставщика>.<тип события>EVENT.V<версия формата события>Пример: AS1.ENTITYCHANGEDEVENT.V1
запросить у администратора доступа права на создание топик с получившимся наименованием на кластере и выдачу на него прав на запись для полученного сертификата.
Шаги разработки:
Подключить в зависимости проекта библиотеку
"org.apache.kafka:kafka-clients:3.7.2".В классе произвести заполнение параметров подключения:
Properties props = new Properties();
props.put("bootstrap.servers", "{ IP_ADDRESS },{ IP_ADDRESS },{ IP_ADDRESS }");
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.identification.algorithm", "");
, где:
bootstrap.servers— список серверов кластера Platform V Corax / Apache Kafka с указанием портов через запятую;acks— необходимое количество ответов от брокеров для того, чтобы считать запись успешной.Возможные значения:
0 - ответ не ожидается; 1 - достаточно ответа от брокера-лидера; all - ожидается ответ от всех реплик, что они получили сообщение.batch.size— размер пачки в байтах, в которые будут группироваться события в случае отправки нескольких событий;linger.ms— количество миллисекунд, которое Producer ждет для получения возможных новых событий для пачки перед отправкой;buffer.memory- общее количество памяти, выделяемое для буферизации отправляемых событий;key.serializerиvalue.serializer— класс-сериализатор ключа и значения, переданных вProducerRecord. В комплекте с клиентом идут два стандартных классаByteArraySerializerиStringSerializer.
Добавить непосредственно отправку события:
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String,String> record = new ProducerRecord<String,String>("my-topic", key, value);
producer.send(record);
producer.close();
, где:
props— предзаполненные параметры подключения;my-topic— наименование топика, в который производится отправка;key— ключ события;value— текст события.
Отправка возможна одним из трех вариантов:
Блокирующий синхронный, с ожиданием ответа об успешной отправке: producer.send(record).get()
Асинхронный без ожидания ответа: producer.send(record)
Асинхронный с ожиданием ответа:
producer.send(record,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
})