Конфигурирование брокера Kafka#
Для безопасной конфигурации records policy, настройки могут быть указаны в параметрах брокера — в файле server.properties:
records.policy.name— имя policy по умолчанию. Будет применяться, если явно не переопределен параметрrecords.policy.nameпри создании топика или изменении его настроек. Если параметр отсутствует вserver.properties, то используетсяkafka.server.policy.DenyAllRecordsPolicy, которая запрещает запись данных в топик;records.policy.[name].class.name— имя класса или название policy, реализующего проверку сообщений на уровне брокера;records.policy.[name].param1, records.policy.[name].param2— параметры конфигурации policy. Доступны в методеRecordsPolicy#configure;где
[name]— название преднастроенной policy. Его надо указывать при создании топика, который будет проверяться с этой policy. Если не указано, будет применяться название по умолчанию из параметраrecords.policy.name.
Внимание!
Все настройки records policy в
server.propertiesдолжны быть одинаковыми для всех брокеров в кластере во избежании недетерминированного поведения.Если параметр с именем policy по умолчанию
records.policy.nameотличается вserver.propertiesдля брокеров в кластере, то возможно недетерминированное поведение для топика, у которого явно не задан параметрrecords.policy.nameи применено значение по умолчанию. Для такого топика на разных брокерах будет применена разная policy в зависимости от настроек policy по умолчанию конкретного брокера, который фактически обрабатывает клиентский запрос.
Пример:
records.policy.secure-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
# Параметры подключения к Schema Registry REST server.
records.policy.secure-policy.url=<Schema-registry-REST-server-address1>,<Schema-registry-REST-server-address2>
records.policy.secure-policy.user=user
# Пароль зашифрован.
records.policy.secure-policy.password.encrypted=uKLIwVWQXoYgOBcqtN4DXA==
# Параметры для расшифровки пароля.
# На стороне пользователя kafka знаний о параметрах подключения к REST server не требуется.
# Достаточно при создании топика указать `records.policy.name=secure-policy`
records.policy.secure-policy.security.encoding.class=ru.sbrf.kafka.security.SimpleTextPasswordDecoder
records.policy.secure-policy.security.encoding.salt=ru.sbrf.kafka.security.FileSaltProvider
records.policy.secure-policy.security.encoding.key=KEY_1
records.policy.secure-policy.security.encoding.salt.location=classpath:/salt.txt
Конфигурация при настроенном SSL на Schema Registry#
Пример настроек при настроенном SSL на Schema Registry:
records.policy.name=https-policy
records.policy.https-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
records.policy.https-policy.url=<schema-registry-host:port>
records.policy.https-policy.user=user
records.policy.https-policy.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.truststore.location=/opt/Apache/kafka/ssl/truststore.jks
records.policy.https-policy.ssl.truststore.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.keystore.location=/opt/Apache/kafka/ssl/keystore.jks
records.policy.https-policy.ssl.keystore.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.endpoint.identification.algorithm=all
config.providers=decode
config.providers.decode.class=ru.sbt.ss.kafka.DecryptionConfigProvider
config.providers.decode.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.decode.param.security.encoding.class=ru.sbt.ss.password.decoder.SimpleTextPasswordDecoder
config.providers.decode.param.security.encoding.salt=ru.sbt.ss.password.salt.SbtSaltProvider
Конфигурация при настроенном SSL на Schema Registry и интеграции с SecMan#
records.policy.name=https-policy
records.policy.https-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
records.policy.https-policy.url=<schema-registry-host:port>
records.policy.https-policy.user=user
records.policy.https-policy.password=${my_secman:user.password}
records.policy.https-policy.ssl.truststore.location=/opt/Apache/kafka/ssl/secman-truststore.jks
records.policy.https-policy.ssl.truststore.password=${my_secman:truststore.password}
records.policy.https-policy.ssl.endpoint.identification.algorithm=all
records.policy.https-policy.secman.endpoint=<secman-host:port>
records.policy.https-policy.secman.namespace=<namespace>
records.policy.https-policy.secman.fetch.role=role-ga-secman-ospt
records.policy.https-policy.secman.fetch.mount.path=PKI
records.policy.https-policy.secman.fetch.cn=devops.corax.solution.sbt
records.policy.https-policy.secman.role_id=${decode:ZptCiwenOQ881a8tZ5UJTs75vp06VQH5Jaj3r5Z9lydJuDJCrfzS13d152HsP0qn}
records.policy.https-policy.secman.secret_id=${decode:AH89qSfMnuhAi2p66q2drHgqs4+1t4y8Ra9F/ECcRzsj3dCiO03viu7Ajpw8vj9S}
config.providers=decode,my_secman
config.providers.decode.class=ru.sbt.ss.kafka.DecryptionConfigProvider
config.providers.decode.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.decode.param.security.encoding.class=ru.sbt.ss.password.decoder.SimpleTextPasswordDecoder
config.providers.decode.param.security.encoding.salt=ru.sbt.ss.password.salt.SbtSaltProvider
config.providers.my_secman.class=ru.sbrf.kafka.secman.SecmanConfigProvider
config.providers.my_secman.param.endpoint=<secman-host:port>
config.providers.my_secman.param.namespace=<namespace>
config.providers.my_secman.param.data.path=A/DEV/KFKA/KV/kafka-dev
config.providers.my_secman.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.my_secman.param.role_id=${decode:AIjidYrrCWj+wRxqY+zPaEzQEqgZuhHp9wJMkkjXHKQuurznvZjQhXJbMUyPNFqr}
config.providers.my_secman.param.secret_id=${decode:l5DwThFz1ir+CGi+kS+Z5VbvGwpRB4VBrnhiAW4NnPYPFgk22wq8nFWsmhA2Zk2D}
Валидация имени policy на брокере#
Запуск брокера с указанным параметром records.policy.name без настроек именованной policy#
Если в server.properties указан параметр records.policy.name=https-policy, но не указаны настройки с префиксом records.policy.https-policy., то старт брокера завершится с ошибкой в server.log:
[2023-11-14 17:17:31,328] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.errors.InvalidConfigurationException: Parameter 'records.policy.name' is specified, but records policy configs with prefix 'records.policy.https-policy' not found in server.properties.
[2023-11-14 17:17:31,329] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
Запуск брокера с невалидными настройками policy в кластер / Перезапуск брокера с измененными настройками policy#
Рассмотрим сценарий, когда в кластере уже запущены два брокера, созданы топики со включенной валидацией данных по SR и требуется добавить в кластер третий брокер. Либо уже было три брокера, а затем третий брокер был остановлен, изменены настройки policy и произведен перезапуск брокера.
Если по какой-то причине для третьего брокера в server.properties указаны невалидные настройки policy (например, указано другое значение
для имени policy по умолчанию , либо они вообще отсутствуют), тогда третий брокер при старте изменяет параметр records.policy.name тех топиков, для которых явно задан данный параметр и настройки policy которой отсутствуют в server.properties третьего брокера.
В server.log будет сообщение об изменении имени policy:
[2023-11-14 17:33:37,601] INFO Change 'records.policy.name' for topic[name=test1, newValue=kafka.server.policy.DenyAllRecordsPolicy, oldValue=https-policy]. (kafka.server.ZkConfigManager)
Для таких топиков будет выставлена kafka.server.policy.DenyAllRecordsPolicy, которая запрещает запись данных в топик и при каждом запросе на запись будет выбрасывать ошибку:
[2023-11-14 17:35:56,224] ERROR Error when sending message to topic test1 with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.PolicyViolationException: Configs for initially configured policy with name 'https-policy' not found in server.properties.
После того, как на проблемном брокере будут выставлены корректные настройки policy, для топиков с records.policy.name=kafka.server.policy.DenyAllRecordsPolicy нужно изменить параметр на валидное значение.
Создание топика#
Если при создании топика передать невалидное имя policy, топик не будет создан.
Пример:
// Создание топика
./bin/kafka-topics.sh --bootstrap-server host:port --create --topic test --config records.policy.enabled=true --config records.policy.name=invalid
// Ошибка при выполнении
Error while executing topic command : Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
[2023-11-14 17:05:05,811] ERROR org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
(kafka.admin.TopicCommand$)
Изменение настроек существующего топика#
Если при изменении параметров топика передано невалидное имя policy, изменение не будет выполнено.
Пример:
// Изменение параметров топика
./bin/kafka-configs.sh --bootstrap-server host:port --entity-type topics --entity-name test --alter --add-config records.policy.enabled=true,records.policy.name=invalid
// Ошибка при выполнении
Error while executing config command with args '--bootstrap-server host:port --entity-type topics --entity-name test --alter --add-config records.policy.enabled=true,records.policy.name=invalid'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:359)
at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:326)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
Настройки топика#
Настройки конфигурации Schema Registry при создании или изменении топика:
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
false |
Включает или выключает проверку данных топика с помощью Records Policy |
|
|
Имя класса или название policy, реализующего проверку сообщений на уровне брокера. Для проверки по схеме должен быть указан класс |
|
null |
Строка конфигурации. Параметры разделяются запятой. Формат |
где
<sr-host>– хост, на котором запущен сервер Schema Registry;<sr-port>– порт, заданный в конфигурационном файлеschema-registry.properties(по умолчанию 8081).
Если при создании или изменении топика явно не указан параметр records.policy.name, то будет применяться policy из одноименного
параметра в server.properties, который определяет значение по умолчанию для текущего брокера.
Если параметр records.policy.name отсутствует в server.properties, то будет применяться kafka.server.policy.DenyAllRecordsPolicy.