Публикация событий#

Для передачи событий через компонент EVTD необходимо реализовать отправку событий и их получение на стороне клиентов: продюсер на стороне отправителя и консьюмер на стороне получателя.

Публикация событий – процесс отправки сообщений в определенный топик определенного кластера Platform V Corax / Apache Kafka.

Для этого необходимо:

  • подготовить клиентский сертификат для подключения приложения к Platform V Corax / Apache Kafka;

  • определить кластер, в который будет производиться запись;

  • определить наименование топик, в который будут отправляться события в соответствии с правилом наименования: <индентификатор системы-поставщика>.<тип события>EVENT.V<версия формата события>

    Пример: AS1.ENTITYCHANGEDEVENT.V1

  • запросить у администратора доступа права на создание топик с получившимся наименованием на кластере и выдачу на него прав на запись для полученного сертификата.

Шаги разработки:

  1. Подключить в зависимости проекта библиотеку "org.apache.kafka:kafka-clients:3.7.2".

  2. В классе произвести заполнение параметров подключения:

Properties props = new Properties();
props.put("bootstrap.servers", "{ IP_ADDRESS },{ IP_ADDRESS },{ IP_ADDRESS }");
props.put("acks", "all");
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SSL");
props.put("ssl.keystore.location", "<полный путь до хранилища ключа>" );
props.put("ssl.truststore.location", "<полный путь до хранилища ключа>");
props.put("ssl.keystore.password", "<пароль хранилища ключа>");
props.put("ssl.truststore.password", "<пароль хранилища сертификата ЦС>");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.key.password", "<пароль закрытого ключа>");
props.put("ssl.endpoint.identification.algorithm", "");

, где:

  • bootstrap.servers — список серверов кластера Platform V Corax / Apache Kafka с указанием портов через запятую;

  • acks — необходимое количество ответов от брокеров для того, чтобы считать запись успешной.

    Возможные значения:

    0 - ответ не ожидается;
    
    1 - достаточно ответа от брокера-лидера;
    
    all - ожидается ответ от всех реплик, что они получили сообщение.
    
  • batch.size — размер пачки в байтах, в которые будут группироваться события в случае отправки нескольких событий;

  • linger.ms — количество миллисекунд, которое продюсер ждет для получения возможных новых событий для пачки перед отправкой;

  • buffer.memory - общее количество памяти, выделяемое для буферизации отправляемых событий;

  • key.serializer и value.serializer — класс-сериализатор ключа и значения, переданных в ProducerRecord. В комплекте с клиентом идут два стандартных класса ByteArraySerializer и StringSerializer.

  1. Добавить непосредственно отправку события:

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String,String> record = new ProducerRecord<String,String>("my-topic", key, value);
producer.send(record);
producer.close();

, где:

  • props — предзаполненные параметры подключения;

  • my-topic — наименование топика, в который производится отправка;

  • key — ключ события;

  • value — текст события.

Отправка возможна одним из трех вариантов:

Блокирующий синхронный, с ожиданием ответа об успешной отправке: producer.send(record).get()

Асинхронный без ожидания ответа: producer.send(record)

Асинхронный с ожиданием ответа:

  producer.send(record,
  new Callback() {
  public void onCompletion(RecordMetadata metadata, Exception e) {
  if(e != null) {
  e.printStackTrace();
  } else {
  System.out.println("The offset of the record we just sent is: " + metadata.offset());
  }
  }
  })