Расширение логики работы перехватчиков#

В некоторых случаях может быть полезно расширить логику работы перехватчиков. Например, если требуется обработка сообщений перехватчиком только в случае, когда сообщение получено из определенного топика или содержит определенный заголовок.

Создание и конфигурация перехватчиков#

Создание и конфигурация перехватчиков для Kafka-consumer и Kafka-producer идентична и происходит следующим образом:

  1. Kafka-клиент получает имя класса перехватчика из настройки interceptor.classes.

  2. Kafka-клиент создает объект класса с использованием рефлексии, вызывая конструктор по умолчанию без параметров.

  3. Kafka-клиент передает полную конфигурацию kafka-клиента перехватчика с помощью метода configure():

public interface Configurable {
   void configure(Map<String, ?> configs);
}

Переопределив метод configure() можно получить, изменить или добавить любые настройки перехватчика.

Расширение логики перехватчика для Kafka-producer#

В общем случае перехватчики для Kafka-producer реализуют стандартный интерфейс org.apache.kafka.clients.producer.ProducerInterceptor:

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    void onAcknowledgement(RecordMetadata metadata, Exception exception);

    void close();
}
  • Обработка сообщений перед отправкой происходит в методе onSend(), в который передается сообщение типа ProducerRecord. В сообщении доступны:

    • топик;

    • партиция (если имеется);

    • ключ сообщения;

    • значение сообщения;

    • заголовки сообщения;

    • время (timestamp) сообщения.

  • Метод onAcknowledgement() вызывается при получении Kafka-клиентом подтверждения отправки/ошибки при отправке каждого сообщения. Большинством перехватчиков метод не используется.

  • Метод close() используется для освобождения ресурсов, вызывается при остановке Kafka-клиента.

Расширение логики перехватчика для Kafka-consumer#

В общем случае перехватчики для Kafka-consumer реализуют стандартный интерфейc org.apache.kafka.clients.consumer.ConsumerInterceptor:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

    void close();
}
  • Обработка сообщений перед передачей в прикладной код происходит в методе onConsume(), в который передается пачка сообщений типа ConsumerRecords.

  • Практически у каждого перехватчика определен метод onConsume(ConsumerRecord) для обработки каждого сообщения из пачки, рекомендуется использовать его. В сообщении доступны:

    • топик;

    • партиция (если имеется);

    • ключ сообщения;

    • значение сообщения;

    • заголовки сообщения;

    • время (timestamp) сообщения.

  • Метод onCommit() вызывается при коммите сообщений клиентом. Большинством перехватчиков метод не используется.

  • Метод close() используется для освобождения ресурсов, вызывается при остановке Kafka-клиента.

Примеры расширения перехватчиков#

1. Перехватчик CertificateSignatureProducerInterceptor:

Пример расширения логики перехватчика CertificateSignatureProducerInterceptor для подписи только тех сообщений, которые отправляются в определенный топик:

package ru.sbt.ss.kafka.interceptors;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
 * Пример наследования {@link CertificateSignatureProducerInterceptor} с добавлением настройки, позволяющей указать топик, при отправке в который сообщения будут подписаны.
 *
 * Intellij IDEA может подсвечивать имя класса ошибкой, связанной с методом interceptorCallback(), это проблема работы java-плагина со scala-классами, на компиляцию не повлияет.
 */
public class CustomCertificateSignatureProducerInterceptor<K, V> extends CertificateSignatureProducerInterceptor<K, V> {

   public static final String TOPIC_CONFIG = "interceptor.signature.certificate.topic";

   private String topic;

   /**
    * Метод обработки сообщения перехватчиком
    * @param record сообщение, можно просматривать топик/ключ/тело/заголовки сообщения
    * @return обработанное сообщение
    */
   @Override
   public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
      if (Objects.nonNull(topic) && topic.equals(record.topic())) {
         return super.onSend(record); // вызвать обработку перехватчиком, в данном случае подпись сообщения
      } else {
         return record; // вернуть исходное сообщение, если обработка перехватчиком не требуется
      }
   }

   /**
    * Метод конфигурирования перехватчика
    * @param configs конфигурация kafka-клиента, можно просматривать/изменять/добавлять настройки
    */
   @Override
   public void configure(Map<String, ?> configs) {
      this.topic = Optional.ofNullable(configs.get(TOPIC_CONFIG))
              .map(Object::toString)
              .orElse(null);

      super.configure(configs);
   }
}

2. Перехватчик CertificateSignatureConsumerInterceptor:

Пример расширения логики перехватчика CertificateSignatureConsumerInterceptor для проверки подписи только у тех сообщений, которые были получены из определенного топика:

package ru.sbt.ss.kafka.interceptors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
 * Пример наследования {@link CertificateSignatureConsumerInterceptor} с добавлением настройки, позволяющей указать топик, при чтении из которого сообщения будут проверены на валидность подписи.
 *
 * Intellij IDEA может подсвечивать имя класса ошибкой, связанной с методом interceptorCallback(), это проблема работы java-плагина со scala-классами, на компиляцию не повлияет.
 */
public class CustomCertificateSignatureConsumerInterceptor<K, V> extends CertificateSignatureConsumerInterceptor<K, V> {

   public static final String TOPIC_CONFIG = "interceptor.signature.certificate.topic";

   private String topic;

   /**
    * Метод обработки пачки сообщений перехватчиком
    * Практически в каждом перехватчике определен метод {@link #onConsume(ConsumerRecord)}, обрабатывающий одно сообщение, рекомендуется использовать его.
    * @param records пачка сообщений, можно просматривать топик/партицию/ключ/тело/заголовки сообщения
    * @return обработанная пачка сообщений
    */
   @Override
   public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
      return super.onConsume(records);
   }

   /**
    * Метод обработки одного сообщения перехватчиком
    * @param record сообщение, можно просматривать топик/партицию/ключ/тело/заголовки сообщения
    * @return обработанное сообщение
    */
   @Override
   public ConsumerRecord<K, V> onConsume(ConsumerRecord<K, V> record) {
      if (Objects.nonNull(topic) && topic.equals(record.topic())) {
         return super.onConsume(record); // вызвать обработку перехватчиком, в данном случае проверку подписи сообщения
      } else {
         return record; // вернуть исходное сообщение, если обработка перехватчиком не требуется
      }
   }

   /**
    * Метод конфигурирования перехватчика
    * @param configs конфигурация kafka-клиента, можно просматривать/изменять/добавлять настройки
    */
   @Override
   public void configure(Map<String, ?> configs) {
      this.topic = Optional.ofNullable(configs.get(TOPIC_CONFIG))
              .map(Object::toString)
              .orElse(null);

      super.configure(configs);
   }
}

Использование расширенных перехватчиков#

Для использования расширенных перехватчиков достаточно передать имя класса в конфигурацию kafka-клиента. Дополнительные расширенные настройки также передаются через конфигурацию Kafka-клиента.

1. Пример использования перехватчика CustomCertificateSignatureProducerInterceptor:

		Map<String, Object> properties = Map.of(
				"bootstrap.servers", "localhost:9092",
				// ... другие стандартные настройки kafka-клиента
				
				// имя класса расширенного перехватчика
				"interceptor.classes", "ru.sbt.ss.kafka.interceptors.CustomCertificateSignatureProducerInterceptor",
				// дополнительная настройка расширенного перехватчика
				"interceptor.signature.certificate.topic", "topic"
				
				// ... другие стандартные настройки перехватчика
		);

		KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

2. Пример использования перехватчика CustomCertificateSignatureConsumerInterceptor:

		Map<String, Object> properties = Map.of(
        "bootstrap.servers", "localhost:9092",
        // ... другие стандартные настройки kafka-клиента

        // имя класса расширенного перехватчика
        "interceptor.classes", "ru.sbt.ss.kafka.interceptors.CustomCertificateSignatureConsumerInterceptor",
        // дополнительная настройка расширенного перехватчика
        "interceptor.signature.certificate.topic", "topic"

        // ... другие стандартные настройки перехватчика
        );

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);