Быстрый старт#

Готовый вариант проекта json-extractor-interceptor.

Шаг 1. Подключить зависимость Kafka-клиент#

Для подключения зависимости в Gradle проект, необходимо в файл build.gradle добавить строку:

implementation 'org.apache.kafka:kafka-clients:4.2.0'

Для подключения зависимости в Maven проект, необходимо в файл pom.xml добавить в блок dependencies следующую зависимость:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>4.2.0</version>
</dependency>

Шаг 2. Подключить интерсептор#

Для подключения интерсептора, необходимо в файл build.gradle добавить строку:

implementation 'ru.sbt.ss:json-extractor-interceptor_2.13:4.2.0-1.2.0'

Для подключения интерсептора в Maven проект, необходимо в файл pom.xml добавить в блок dependencies следующую зависимость:

<dependency>
    <groupId>ru.sbt.ss</groupId>
    <artifactId>json-extractor-interceptor_2.13</artifactId>
    <version>4.2.0-1.2.0</version>
</dependency>

Шаг 3. Настроить продюсер#

Настройка продюсера осуществляется в файле producer.properties:

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT

interceptor.classes = ru.sbt.ss.kafka.clients.producer.JsonExtractorInterceptor
interceptor.json.extractor.path = $['message']['id']
interceptor.json.extractor.header.name = MessageID

Шаг 4. Настроить консьюмер#

Настройка консьюмера осуществляется в файле consumer.properties:

group.id = test-group

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT

Шаг 5. Создание продюсера и консьюмера Kafka. Отправка и вычитка сообщения#

Создание продюсера и консьюмера, а также отправка и вычитка сообщений осуществляется в в файле *.java:

public class JsonExtractorInterceptorExample {

    private static final String TOPIC = "PLAINTEXT";

    private static final String MESSAGE = """
{
  "message": {
    "id": "someId",
    "body": "messageBody"
  }
}
            """;

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
        final var producerProperties = new Properties();
        final var consumerProperties = new Properties();

        final var consumerPropertiesStream = JsonExtractorInterceptorExample.class
                .getClassLoader().getResourceAsStream("consumer.properties");
        final var producerPropertiesStream = JsonExtractorInterceptorExample.class
                .getClassLoader().getResourceAsStream("producer.properties");

        producerProperties.load(producerPropertiesStream);
        consumerProperties.load(consumerPropertiesStream);

        try (final var kafkaProducer = new KafkaProducer<String, String>(producerProperties);
             final var kafkaConsumer = new KafkaConsumer<String, String>(consumerProperties)) {
            final var recordMetadata = kafkaProducer.send(new ProducerRecord<>(TOPIC, MESSAGE)).get();

            final var topicPartition = new TopicPartition(recordMetadata.topic(), recordMetadata.partition());

            kafkaConsumer.assign(Collections.singletonList(topicPartition));
            kafkaConsumer.seek(topicPartition, recordMetadata.offset());

            final var records = kafkaConsumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("value: %s, headers: %s", record.value(), record.headers().toString());
            }
        }
    }
}