Kafka console tools#
Содержит консольные утилиты kafka-console-producer/kafka-console-consumer, аналогичные утилитам из дистрибутива kafka, для тестирования перехватчиков.
Также позволяют добавить дополнительную директорию с библиотеками в classpath при запуске с помощью аргумента --add-classpath.
Утилиты не будут работать с -shadow версиями перехватчиков, т.к. тоже используют scala
Использование#
Аналогично утилитам из дистрибутива kafka:
bin/kafka-console-producer.shbin/kafka-console-consumer.sh
Подключение дополнительных библиотек#
Первым аргументом при запуске можно передать дополнительную директорию с библиотеками:
bin/kafka-consumer-consumer.sh --add-classpath <относительный/путь/до/директории/с/библиотеками>
Например, при использовании из распакованного дистрибутива kafka-plugins-dist:
- kafka-plugins-dist/
- interceptors/
- validator-interceptor/
- ...
- kafka-console-tools/
- bin/
- config/
- libs/
Можно добавить в classpath зависимости для тестирования validator-interceptor при запуске:
bin/kafka-consumer-consumer.sh --add-classpath ../interceptors/validator-interceptor
Для корректной работы аргумент --add-classpath обязательно должен быть на первом месте, сразу после bin/kafka-consumer-*.sh
Логирование#
Настройки логирования отдельные для producer/consumer находятся по пути config/consumer-logback.xml и config/producer-logback.xml.
По умолчанию логи пишутся в файлы logs/kafka-console-consumer.log и logs/kafka-console-producer.log.
Утилиты используют STDOUT для ввода/вывода сообщений, поэтому для упрощения логи kafka-клиентов пишутся только в файлы.
Примеры конфигураций#
Пример конфигурации без перехватчика:
bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Пример конфигурации с перехватчиком (Kafka-ott-signature-interceptor):
bootstrap.servers = localhost:9092
security.protocol = PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
interceptor.classes = ru.sbt.ss.kafka.ott.OttSignatureConsumerInterceptor
interceptor.signature.ott.service.url = https://test:123
interceptor.signature.ott.service.hosts = <hosts>
interceptor.signature.ott.module.id = ott-test-moduleA
interceptor.signature.ott.authz.realm = mmt
interceptor.signature.ott.service.cert.alias = ott-service
interceptor.signature.ott.certstore.type = JKS
interceptor.signature.ott.certstore.path = config/jks/ott-test-moduleA.p12
interceptor.signature.ott.trust.store.path = config/jks/ift_sol_std3_ott_public.p12
interceptor.signature.ott.certstore.pwd = <pwd>
interceptor.signature.ott.certstore.private.key.pwd = <pwd>
interceptor.signature.ott.trust.store.pwd = <pwd>
interceptor.signature.ott.client.tls = TLSv1.2
interceptor.signature.ott.request.add.subject = urn:sbrf:names:pprb:1.0:module:id#ott-test-moduleA
#interceptor.signature.ott.request.add.action = urn:sbrf:names:pprb:1.0:action:id#publish
interceptor.signature.ott.request.add.resource = urn:sbrf:names:pprb:1.0:api:interface:fullname#${topic}
interceptor.signature.ott.request.add.environment.realm = ott:realm#mmt