Kafka console tools#

Содержит консольные утилиты kafka-console-producer/kafka-console-consumer, аналогичные утилитам из дистрибутива kafka, для тестирования перехватчиков. Также позволяют добавить дополнительную директорию с библиотеками в classpath при запуске с помощью аргумента --add-classpath.

Утилиты не будут работать с -shadow версиями перехватчиков, т.к. тоже используют scala

Использование#

Аналогично утилитам из дистрибутива kafka:

  • bin/kafka-console-producer.sh

  • bin/kafka-console-consumer.sh

Подключение дополнительных библиотек#

Первым аргументом при запуске можно передать дополнительную директорию с библиотеками:

bin/kafka-consumer-consumer.sh --add-classpath <относительный/путь/до/директории/с/библиотеками>

Например, при использовании из распакованного дистрибутива kafka-plugins-dist:

- kafka-plugins-dist/
    - interceptors/
        - validator-interceptor/
        - ...
    - kafka-console-tools/
        - bin/
        - config/
        - libs/

Можно добавить в classpath зависимости для тестирования validator-interceptor при запуске: bin/kafka-consumer-consumer.sh --add-classpath ../interceptors/validator-interceptor

Для корректной работы аргумент --add-classpath обязательно должен быть на первом месте, сразу после bin/kafka-consumer-*.sh

Логирование#

Настройки логирования отдельные для producer/consumer находятся по пути config/consumer-logback.xml и config/producer-logback.xml. По умолчанию логи пишутся в файлы logs/kafka-console-consumer.log и logs/kafka-console-producer.log.

Утилиты используют STDOUT для ввода/вывода сообщений, поэтому для упрощения логи kafka-клиентов пишутся только в файлы.

Примеры конфигураций#

Пример конфигурации без перехватчика:

bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Пример конфигурации с перехватчиком (Kafka-ott-signature-interceptor):

bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
interceptor.classes = ru.sbt.ss.kafka.ott.OttSignatureConsumerInterceptor
interceptor.signature.ott.service.url = https://test:123
interceptor.signature.ott.service.hosts = <hosts>
interceptor.signature.ott.module.id = ott-test-moduleA
interceptor.signature.ott.authz.realm = mmt
interceptor.signature.ott.service.cert.alias = ott-service
interceptor.signature.ott.certstore.type = JKS
interceptor.signature.ott.certstore.path = config/jks/ott-test-moduleA.p12
interceptor.signature.ott.trust.store.path = config/jks/ift_sol_std3_ott_public.p12
interceptor.signature.ott.certstore.pwd = <pwd>
interceptor.signature.ott.certstore.private.key.pwd = <pwd>
interceptor.signature.ott.trust.store.pwd = <pwd>
interceptor.signature.ott.client.tls = TLSv1.2
interceptor.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
#interceptor.signature.ott.request.add.action = urn:sbrf:names:pprb:1.0:action:id#publish
interceptor.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
interceptor.signature.ott.request.add.environment.realm = ott:realm#mmt