Быстрый старт#
Готовый вариант проекта json-extractor-interceptor.
Шаг 1. Подключить зависимость Kafka-клиент#
Для подключения зависимости в Gradle проект, необходимо в файл build.gradle добавить строку:
implementation 'org.apache.kafka:kafka-clients:4.2.0'
Для подключения зависимости в Maven проект, необходимо в файл pom.xml добавить в блок dependencies следующую зависимость:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>4.2.0</version>
</dependency>
Шаг 2. Подключить интерсептор#
Для подключения интерсептора, необходимо в файл build.gradle добавить строку:
implementation 'ru.sbt.ss:json-extractor-interceptor_2.13:4.2.0-1.2.0'
Для подключения интерсептора в Maven проект, необходимо в файл pom.xml добавить в блок dependencies следующую зависимость:
<dependency>
<groupId>ru.sbt.ss</groupId>
<artifactId>json-extractor-interceptor_2.13</artifactId>
<version>4.2.0-1.2.0</version>
</dependency>
Шаг 3. Настроить продюсер#
Настройка продюсера осуществляется в файле producer.properties:
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
interceptor.classes = ru.sbt.ss.kafka.clients.producer.JsonExtractorInterceptor
interceptor.json.extractor.path = $['message']['id']
interceptor.json.extractor.header.name = MessageID
Шаг 4. Настроить консьюмер#
Настройка консьюмера осуществляется в файле consumer.properties:
group.id = test-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
Шаг 5. Создание продюсера и консьюмера Kafka. Отправка и вычитка сообщения#
Создание продюсера и консьюмера, а также отправка и вычитка сообщений осуществляется в в файле *.java:
public class JsonExtractorInterceptorExample {
private static final String TOPIC = "PLAINTEXT";
private static final String MESSAGE = """
{
"message": {
"id": "someId",
"body": "messageBody"
}
}
""";
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
final var producerProperties = new Properties();
final var consumerProperties = new Properties();
final var consumerPropertiesStream = JsonExtractorInterceptorExample.class
.getClassLoader().getResourceAsStream("consumer.properties");
final var producerPropertiesStream = JsonExtractorInterceptorExample.class
.getClassLoader().getResourceAsStream("producer.properties");
producerProperties.load(producerPropertiesStream);
consumerProperties.load(consumerPropertiesStream);
try (final var kafkaProducer = new KafkaProducer<String, String>(producerProperties);
final var kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties)) {
final var recordMetadata = kafkaProducer.send(new ProducerRecord<>(TOPIC, MESSAGE)).get();
final var topicPartition = new TopicPartition(recordMetadata.topic(), recordMetadata.partition());
kafkaConsumer.assign(Collections.singletonList(topicPartition));
kafkaConsumer.seek(topicPartition, recordMetadata.offset());
final var records = kafkaConsumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("value: %s, headers: %s", record.value(), record.headers().toString());
}
}
}
}