Конфигурирование брокера Kafka#

Для безопасной конфигурации records policy, настройки могут быть указаны в параметрах брокера — в файле server.properties:

  • records.policy.name — имя policy по умолчанию. Будет применяться, если явно не переопределен параметр records.policy.name при создании топика или изменении его настроек. Если параметр отсутствует в server.properties, то используется kafka.server.policy.DenyAllRecordsPolicy, которая запрещает запись данных в топик;

  • records.policy.[name].class.name — имя класса или название policy, реализующего проверку сообщений на уровне брокера;

  • records.policy.[name].param1, records.policy.[name].param2 — параметры конфигурации policy. Доступны в методе RecordsPolicy#configure;

    где [name] — название преднастроенной policy. Его надо указывать при создании топика, который будет проверяться с этой policy. Если не указано, будет применяться название по умолчанию из параметра records.policy.name.

Внимание!

Все настройки records policy в server.properties должны быть одинаковыми для всех брокеров в кластере во избежании недетерминированного поведения.

Если параметр с именем policy по умолчанию records.policy.name отличается в server.properties для брокеров в кластере, то возможно недетерминированное поведение для топика, у которого явно не задан параметр records.policy.name и применено значение по умолчанию. Для такого топика на разных брокерах будет применена разная policy в зависимости от настроек policy по умолчанию конкретного брокера, который фактически обрабатывает клиентский запрос.

Пример:

records.policy.secure-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
# Параметры подключения к Schema Registry REST server.
records.policy.secure-policy.url=<Schema-registry-REST-server-address1>,<Schema-registry-REST-server-address2>
records.policy.secure-policy.user=user
# Пароль зашифрован.
records.policy.secure-policy.password.encrypted=uKLIwVWQXoYgOBcqtN4DXA==
# Параметры для расшифровки пароля.
# На стороне пользователя kafka знаний о параметрах подключения к REST server не требуется.
# Достаточно при создании топика указать `records.policy.name=secure-policy`
records.policy.secure-policy.security.encoding.class=ru.sbrf.kafka.security.SimpleTextPasswordDecoder
records.policy.secure-policy.security.encoding.salt=ru.sbrf.kafka.security.FileSaltProvider
records.policy.secure-policy.security.encoding.key=KEY_1
records.policy.secure-policy.security.encoding.salt.location=classpath:/salt.txt

Конфигурация при настроенном SSL на Schema Registry#

Пример настроек при настроенном SSL на Schema Registry:

records.policy.name=https-policy
records.policy.https-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
records.policy.https-policy.url=<schema-registry-host:port>
records.policy.https-policy.user=user
records.policy.https-policy.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.truststore.location=/opt/Apache/kafka/ssl/truststore.jks
records.policy.https-policy.ssl.truststore.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.keystore.location=/opt/Apache/kafka/ssl/keystore.jks
records.policy.https-policy.ssl.keystore.password=${decode:uKLIwVWQXoYgOBcqtN4DXA==}
records.policy.https-policy.ssl.endpoint.identification.algorithm=all

config.providers=decode

config.providers.decode.class=ru.sbt.ss.kafka.DecryptionConfigProvider
config.providers.decode.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.decode.param.security.encoding.class=ru.sbt.ss.password.decoder.SimpleTextPasswordDecoder
config.providers.decode.param.security.encoding.salt=ru.sbt.ss.password.salt.SbtSaltProvider

Конфигурация при настроенном SSL на Schema Registry и интеграции с SecMan#

records.policy.name=https-policy
records.policy.https-policy.class.name=ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
records.policy.https-policy.url=<schema-registry-host:port>
records.policy.https-policy.user=user
records.policy.https-policy.password=${my_secman:user.password}
records.policy.https-policy.ssl.truststore.location=/opt/Apache/kafka/ssl/secman-truststore.jks
records.policy.https-policy.ssl.truststore.password=${my_secman:truststore.password}
records.policy.https-policy.ssl.endpoint.identification.algorithm=all
records.policy.https-policy.secman.endpoint=<secman-host:port>
records.policy.https-policy.secman.namespace=<namespace>
records.policy.https-policy.secman.fetch.role=role-ga-secman-ospt
records.policy.https-policy.secman.fetch.mount.path=PKI
records.policy.https-policy.secman.fetch.cn=devops.corax.solution.sbt
records.policy.https-policy.secman.role_id=${decode:ZptCiwenOQ881a8tZ5UJTs75vp06VQH5Jaj3r5Z9lydJuDJCrfzS13d152HsP0qn}
records.policy.https-policy.secman.secret_id=${decode:AH89qSfMnuhAi2p66q2drHgqs4+1t4y8Ra9F/ECcRzsj3dCiO03viu7Ajpw8vj9S}

config.providers=decode,my_secman

config.providers.decode.class=ru.sbt.ss.kafka.DecryptionConfigProvider
config.providers.decode.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.decode.param.security.encoding.class=ru.sbt.ss.password.decoder.SimpleTextPasswordDecoder
config.providers.decode.param.security.encoding.salt=ru.sbt.ss.password.salt.SbtSaltProvider

config.providers.my_secman.class=ru.sbrf.kafka.secman.SecmanConfigProvider
config.providers.my_secman.param.endpoint=<secman-host:port>
config.providers.my_secman.param.namespace=<namespace>
config.providers.my_secman.param.data.path=A/DEV/KFKA/KV/kafka-dev
config.providers.my_secman.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass
config.providers.my_secman.param.role_id=${decode:AIjidYrrCWj+wRxqY+zPaEzQEqgZuhHp9wJMkkjXHKQuurznvZjQhXJbMUyPNFqr}
config.providers.my_secman.param.secret_id=${decode:l5DwThFz1ir+CGi+kS+Z5VbvGwpRB4VBrnhiAW4NnPYPFgk22wq8nFWsmhA2Zk2D}

Валидация имени policy на брокере#

Запуск брокера с указанным параметром records.policy.name без настроек именованной policy#

Если в server.properties указан параметр records.policy.name=https-policy, но не указаны настройки с префиксом records.policy.https-policy., то старт брокера завершится с ошибкой в server.log:

[2023-11-14 17:17:31,328] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.errors.InvalidConfigurationException: Parameter 'records.policy.name' is specified, but records policy configs with prefix 'records.policy.https-policy' not found in server.properties.
[2023-11-14 17:17:31,329] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)

Запуск брокера с невалидными настройками policy в кластер / Перезапуск брокера с измененными настройками policy#

Рассмотрим сценарий, когда в кластере уже запущены два брокера, созданы топики со включенной валидацией данных по SR и требуется добавить в кластер третий брокер. Либо уже было три брокера, а затем третий брокер был остановлен, изменены настройки policy и произведен перезапуск брокера.

Если по какой-то причине для третьего брокера в server.properties указаны невалидные настройки policy (например, указано другое значение для имени policy по умолчанию , либо они вообще отсутствуют), тогда третий брокер при старте изменяет параметр records.policy.name тех топиков, для которых явно задан данный параметр и настройки policy которой отсутствуют в server.properties третьего брокера.

В server.log будет сообщение об изменении имени policy:

[2023-11-14 17:33:37,601] INFO Change 'records.policy.name' for topic[name=test1, newValue=kafka.server.policy.DenyAllRecordsPolicy, oldValue=https-policy]. (kafka.server.ZkConfigManager)

Для таких топиков будет выставлена kafka.server.policy.DenyAllRecordsPolicy, которая запрещает запись данных в топик и при каждом запросе на запись будет выбрасывать ошибку:

[2023-11-14 17:35:56,224] ERROR Error when sending message to topic test1 with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.PolicyViolationException: Configs for initially configured policy with name 'https-policy' not found in server.properties.

После того, как на проблемном брокере будут выставлены корректные настройки policy, для топиков с records.policy.name=kafka.server.policy.DenyAllRecordsPolicy нужно изменить параметр на валидное значение.

Создание топика#

Если при создании топика передать невалидное имя policy, топик не будет создан.

Пример:

// Создание топика
./bin/kafka-topics.sh --bootstrap-server host:port --create --topic test --config records.policy.enabled=true --config records.policy.name=invalid

// Ошибка при выполнении
Error while executing topic command : Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
[2023-11-14 17:05:05,811] ERROR org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
 (kafka.admin.TopicCommand$)

Изменение настроек существующего топика#

Если при изменении параметров топика передано невалидное имя policy, изменение не будет выполнено.

Пример:

// Изменение параметров топика
./bin/kafka-configs.sh --bootstrap-server host:port --entity-type topics --entity-name test --alter --add-config records.policy.enabled=true,records.policy.name=invalid

// Ошибка при выполнении
Error while executing config command with args '--bootstrap-server host:port --entity-type topics --entity-name test --alter --add-config records.policy.enabled=true,records.policy.name=invalid'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
        at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:359)
        at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:326)
        at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97)
        at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Records policy configs with prefix 'records.policy.invalid.' not found in server.properties. Valid names for records policies: https-policy.

Настройки топика#

Настройки конфигурации Schema Registry при создании или изменении топика:

Имя

Значение по умолчанию

Описание

records.policy.enabled

false

Включает или выключает проверку данных топика с помощью Records Policy

records.policy.name

kafka.server.policy.DenyAllRecordsPolicy

Имя класса или название policy, реализующего проверку сообщений на уровне брокера. Для проверки по схеме должен быть указан класс ru.sbrf.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy. По умолчанию установлена политика DenyAllRecordsPolicy, которая запрещает запись данных в топик.

records.policy.config

null

Строка конфигурации. Параметры разделяются запятой. Формат "url=http://<sr-host>:<sr-port>,user=myname,password=mypassword". url — адрес Schema Registry REST server, user,password - учетные данные. Обязательны в случае включенной безопасности

где

  • <sr-host> – хост, на котором запущен сервер Schema Registry;

  • <sr-port> – порт, заданный в конфигурационном файле schema-registry.properties (по умолчанию 8081).

Если при создании или изменении топика явно не указан параметр records.policy.name, то будет применяться policy из одноименного параметра в server.properties, который определяет значение по умолчанию для текущего брокера.

Если параметр records.policy.name отсутствует в server.properties, то будет применяться kafka.server.policy.DenyAllRecordsPolicy.