Расширение логики работы перехватчиков#
В некоторых случаях может быть полезно расширить логику работы перехватчиков. Например, если требуется обработка сообщений перехватчиком только в случае, когда сообщение получено из определенного топика или содержит определенный заголовок.
Создание и конфигурация перехватчиков#
Создание и конфигурация перехватчиков для Kafka-consumer и Kafka-producer идентична и происходит следующим образом:
Kafka-клиент получает имя класса перехватчика из настройки
interceptor.classes.Kafka-клиент создает объект класса с использованием рефлексии, вызывая конструктор по умолчанию без параметров.
Kafka-клиент передает полную конфигурацию kafka-клиента перехватчика с помощью метода
configure():
public interface Configurable {
void configure(Map<String, ?> configs);
}
Переопределив метод configure() можно получить, изменить или добавить любые настройки перехватчика.
Расширение логики перехватчика для Kafka-producer#
В общем случае перехватчики для Kafka-producer реализуют стандартный интерфейс org.apache.kafka.clients.producer.ProducerInterceptor:
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);
void close();
}
Обработка сообщений перед отправкой происходит в методе
onSend(), в который передается сообщение типаProducerRecord. В сообщении доступны:топик;
партиция (если имеется);
ключ сообщения;
значение сообщения;
заголовки сообщения;
время (timestamp) сообщения.
Метод
onAcknowledgement()вызывается при получении Kafka-клиентом подтверждения отправки/ошибки при отправке каждого сообщения. Большинством перехватчиков метод не используется.Метод
close()используется для освобождения ресурсов, вызывается при остановке Kafka-клиента.
Расширение логики перехватчика для Kafka-consumer#
В общем случае перехватчики для Kafka-consumer реализуют стандартный интерфейc org.apache.kafka.clients.consumer.ConsumerInterceptor:
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
void close();
}
Обработка сообщений перед передачей в прикладной код происходит в методе
onConsume(), в который передается пачка сообщений типаConsumerRecords.Практически у каждого перехватчика определен метод
onConsume(ConsumerRecord)для обработки каждого сообщения из пачки, рекомендуется использовать его. В сообщении доступны:топик;
партиция (если имеется);
ключ сообщения;
значение сообщения;
заголовки сообщения;
время (timestamp) сообщения.
Метод
onCommit()вызывается при коммите сообщений клиентом. Большинством перехватчиков метод не используется.Метод
close()используется для освобождения ресурсов, вызывается при остановке Kafka-клиента.
Примеры расширения перехватчиков#
1. Перехватчик CertificateSignatureProducerInterceptor:
Пример расширения логики перехватчика CertificateSignatureProducerInterceptor для подписи только тех сообщений, которые отправляются в определенный топик:
package ru.sbt.ss.kafka.interceptors;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* Пример наследования {@link CertificateSignatureProducerInterceptor} с добавлением настройки, позволяющей указать топик, при отправке в который сообщения будут подписаны.
*
* Intellij IDEA может подсвечивать имя класса ошибкой, связанной с методом interceptorCallback(), это проблема работы java-плагина со scala-классами, на компиляцию не повлияет.
*/
public class CustomCertificateSignatureProducerInterceptor<K, V> extends CertificateSignatureProducerInterceptor<K, V> {
public static final String TOPIC_CONFIG = "interceptor.signature.certificate.topic";
private String topic;
/**
* Метод обработки сообщения перехватчиком
* @param record сообщение, можно просматривать топик/ключ/тело/заголовки сообщения
* @return обработанное сообщение
*/
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
if (Objects.nonNull(topic) && topic.equals(record.topic())) {
return super.onSend(record); // вызвать обработку перехватчиком, в данном случае подпись сообщения
} else {
return record; // вернуть исходное сообщение, если обработка перехватчиком не требуется
}
}
/**
* Метод конфигурирования перехватчика
* @param configs конфигурация kafka-клиента, можно просматривать/изменять/добавлять настройки
*/
@Override
public void configure(Map<String, ?> configs) {
this.topic = Optional.ofNullable(configs.get(TOPIC_CONFIG))
.map(Object::toString)
.orElse(null);
super.configure(configs);
}
}
2. Перехватчик CertificateSignatureConsumerInterceptor:
Пример расширения логики перехватчика CertificateSignatureConsumerInterceptor для проверки подписи только у тех сообщений, которые были получены из определенного топика:
package ru.sbt.ss.kafka.interceptors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
/**
* Пример наследования {@link CertificateSignatureConsumerInterceptor} с добавлением настройки, позволяющей указать топик, при чтении из которого сообщения будут проверены на валидность подписи.
*
* Intellij IDEA может подсвечивать имя класса ошибкой, связанной с методом interceptorCallback(), это проблема работы java-плагина со scala-классами, на компиляцию не повлияет.
*/
public class CustomCertificateSignatureConsumerInterceptor<K, V> extends CertificateSignatureConsumerInterceptor<K, V> {
public static final String TOPIC_CONFIG = "interceptor.signature.certificate.topic";
private String topic;
/**
* Метод обработки пачки сообщений перехватчиком
* Практически в каждом перехватчике определен метод {@link #onConsume(ConsumerRecord)}, обрабатывающий одно сообщение, рекомендуется использовать его.
* @param records пачка сообщений, можно просматривать топик/партицию/ключ/тело/заголовки сообщения
* @return обработанная пачка сообщений
*/
@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
return super.onConsume(records);
}
/**
* Метод обработки одного сообщения перехватчиком
* @param record сообщение, можно просматривать топик/партицию/ключ/тело/заголовки сообщения
* @return обработанное сообщение
*/
@Override
public ConsumerRecord<K, V> onConsume(ConsumerRecord<K, V> record) {
if (Objects.nonNull(topic) && topic.equals(record.topic())) {
return super.onConsume(record); // вызвать обработку перехватчиком, в данном случае проверку подписи сообщения
} else {
return record; // вернуть исходное сообщение, если обработка перехватчиком не требуется
}
}
/**
* Метод конфигурирования перехватчика
* @param configs конфигурация kafka-клиента, можно просматривать/изменять/добавлять настройки
*/
@Override
public void configure(Map<String, ?> configs) {
this.topic = Optional.ofNullable(configs.get(TOPIC_CONFIG))
.map(Object::toString)
.orElse(null);
super.configure(configs);
}
}
Использование расширенных перехватчиков#
Для использования расширенных перехватчиков достаточно передать имя класса в конфигурацию kafka-клиента. Дополнительные расширенные настройки также передаются через конфигурацию Kafka-клиента.
1. Пример использования перехватчика CustomCertificateSignatureProducerInterceptor:
Map<String, Object> properties = Map.of(
"bootstrap.servers", "localhost:9092",
// ... другие стандартные настройки kafka-клиента
// имя класса расширенного перехватчика
"interceptor.classes", "ru.sbt.ss.kafka.interceptors.CustomCertificateSignatureProducerInterceptor",
// дополнительная настройка расширенного перехватчика
"interceptor.signature.certificate.topic", "topic"
// ... другие стандартные настройки перехватчика
);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
2. Пример использования перехватчика CustomCertificateSignatureConsumerInterceptor:
Map<String, Object> properties = Map.of(
"bootstrap.servers", "localhost:9092",
// ... другие стандартные настройки kafka-клиента
// имя класса расширенного перехватчика
"interceptor.classes", "ru.sbt.ss.kafka.interceptors.CustomCertificateSignatureConsumerInterceptor",
// дополнительная настройка расширенного перехватчика
"interceptor.signature.certificate.topic", "topic"
// ... другие стандартные настройки перехватчика
);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);