Сценарии администрирования#
Для выполнении действий, описанных в сценариях администрирования, пользователю должны быть назначены соответствующие разрешения на выполнение операций из ролевой модели.
Разрешения на выполнение операций полностью соответствуют разрешениям Apache Kafka.
Конфигурирование Zookeeper#
Конфигурирование Брокера Zookeeper осуществляется в файле config/zookeeper.properties.
Допускается создание файлов конфигурации с различными именами, например:
config/zookeeper-1-node-cluster.properties
config/zookeeper-3-node-cluster.properties
config/zookeeper-3-node-cluster-on-1-machine.properties
config/zookeeper-5-node-cluster.properties
При запуске Zookeeper достаточно указать нужный конфигурационный файл.
Конфигурирование узла Zookeeper#
В данном подразделе описывается конфигурирование Zookeeper для запуска в качестве standalone-приложения на одной машине. Этот вид конфигурации не обеспечивает отказоустойчивости и не рекомендован для использования в промышленной эксплуатации.
Конфигурирование узла Apache Zookeeper осуществляется в файле zookeeper.properties.
Минимальная конфигурация#
Пример файла конфигураций zookeeper.properties для одного Брокера:
dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000
Шаги создания конфигурационного файла:
Для переменной
dataDirукажите путь к директории, в которую Zookeeper будет сохранять файлы с данными, необходимыми ему для работы.Пример
dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-dataДля переменной
clientPortукажите номер порта, к которому будут подключаться клиенты.Пример
clientPort=2181Для переменной
tickTimeукажите единицу измерения времени Zookeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» Zookeeper.tickTimeнеявно используется в настройках для различных временных параметров Zookeeper. Например, для временной настройкиinitLimit=10— это означает, чтоinitLimitбудет выполняться за10 * tickTimeмиллисекунд.Пример
tickTime=2000
Расширенная конфигурация#
Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:
maxClientCnxns— максимально допустимое количество соединений (на уровне сокетов), устанавливаемых клиентом (IP-адрес) с одним из серверов кластера Zookeeper.Если значение равно нулю или настройка не указана в файле конфигураций, то количество одновременных подключений считается неограниченным.
Данная настройка позволяет предотвратить некоторые виды DoS-атак.
Пример
maxClientCnxns=0dataLogDir— позволяет выделить отдельную директорию для хранения транзакционного лога Zookeeper. По умолчанию все транзакционные логи Zookeeper (файлыlog.) хранятся вместе с файламиsnapshot., т. н. «снепшотами», в одной общей директорииdataDir.Свойство
dataLogDirможет быть использовано для сохранения транзакционных логов в памяти отдельного устройства, т. е. в памяти другого физического, а не логического диска, для достижения наилучшей производительности (за счет уменьшения нагрузки на диск).Пример
dataLogDir=/another/path/to/zookeeper-logsДанные Zookeeper — это персистентные (неизменяемые) копии Zookeeper-узлов (
znodes), хранимые соответствующим ансамблем на диске в виде файлов двух видов:Транзакционные логи (
transaction log) — это файлы с префиксомlog., в которые в последовательном порядке записываются все изменения, происходящие с Zookeeper-узлами.Снепшоты (
snapshot) — это файлы с префиксомsnapshot.Когда файл транзакционного лога достигает определенного размера, создается копия текущего состояния всех Zookeeper-узлов в виде снепшота. Этот снепшот заменяет все предыдущие логи.
Примечание
Устаревшие версии файлов транзакционных логов и снепшотов не удаляются с диска сервером Zookeeper — эта задача полностью ложится на администратора Zookeeper.
globalOutstandingLimit— ограничение времени, позволяющее регулировать количество обращений клиентов к серверу Zookeeper. Поскольку клиенты отправляют запросы быстрее, чем сервер успевает их обработать, сервер может не справиться и израсходовать всю доступную память. По умолчанию число запросов, находящихся в обработке в некоторый момент времени, не превышает1000.Пример
globalOutstandingLimit=1000preAllocSize— размер блока лог-файлов транзакций. По умолчанию равен 64 Мбайт. Использование транзакционного лога минимизирует количество обращений к диску. Если снепшоты Zookeeper создаются слишком часто, размер логов транзакций может не достигать 64 Мбайт. В таких ситуациях включение этого параметра позволяет оптимизировать расход объема дискового пространства.snapCount— число транзакций, которые будут записаны в лог транзакций после создания очередного «снепшота» («snapshot») — файла данных Zookeeper. По умолчанию —10000.Пример
snapCount =10000traceFile— файл трассировки в форматеtraceFile.year.month.day, в который будут записываться все запросы клиентов. Полезен в целях отладки. Побочный эффект — некоторый спад производительности.
Текущая используемая конфигурация#
Файл конфигураций zookeeper.properties для Брокера следующий:
dataDir=/tmp/kafka/zookeeper
clientPort=2185
maxClientCnxns=0
Конфигурирование кластера Zookeeper#
Конфигурирование кластера Apache ZooKeeper осуществляется в файле zookeeper.properties.
Минимальная конфигурация кластера#
Пример минимального файла конфигураций zookeeper.properties для кластера из трех серверов:
server.1=zookeeper-1-ip:2888:3888
server.2=zookeeper-2-ip:2888:3888
server.3=zookeeper-3-ip:2888:3888
dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000
initLimit=5
syncLimit=2
Шаги создания конфигурационного файла:
Добавьте в файл переменные
server.INDEX=IP:PORT1:PORT2, где:INDEX— индекс (myid) узла Zookeeper в кластере;IP— адрес данного сервера;PORT1— порт для подключения серверов-последователей в кластере к лидеру;PORT2— порт для подключения остальных сервисов Zookeeper в момент выбора сервиса-лидера в кластере.
Если необходимо запустить кластер Zookeeper на нескольких серверах, то в конфигурационном файле укажите адреса всех серверов.
Пример
server.1=xxx.xxx.1.1:2889:3889 server.2=xxx.xxx.1.2:2889:3889Для переменной
dataDirукажите путь к директории, в которую Zookeeper будет сохранять файлы с данными, необходимыми ему для работы.Пример
dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-dataДля переменной
clientPortукажите номер порта, к которому будут подключаться клиенты.Пример
clientPort=2181Для переменной
tickTimeукажите единицу измерения времени Zookeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» Zookeeper.tickTimeнеявно используется в настройках для различных временных параметров Zookeeper. Например, для временной настройкиinitLimit=10— это означает, чтоinitLimitбудет выполняться за10 * tickTimeмиллисекунд.По умолчанию
tickTime=2000миллисекунд.Пример
tickTime=2000Добавьте в файл переменную
initLimit— максимальное допустимое количество «тиков» (tickTime), которое может пройти, прежде чем узел кластера синхронизируется при запуске. При превышении будет сгенерирована ошибка.Пример
initLimit=10Т. е. максимальное допустимое время на синхронизацию узла кластера при запуске будет равно (
tickTime*initLimit) миллисекунд.Примечание
Это обязательное свойство, без которого узел Zookeeper не будет запущен.
Добавьте в файл переменную
syncLimit— максимальное допустимое количество «тиков» (tickTime) между отправкой запроса и получением ответа, прежде чем будет сгенерировано исключение.Пример
syncLimit=5Примечание
Это обязательное свойство, без которого узел Zookeeper не будет запущен.
На этом минимальное конфигурирование кластера Zookeeper завершается.
Расширенная конфигурация кластера#
Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:
electionAlg— тип алгоритма для выбора лидера. Возможные значения:0— версия алгоритма для выбора лидера — это оригинальная версия на основе UDP (User Datagram Protocol);1— версия алгоритма для быстрого выбора лидера на основе UDP, неавторизованный;2— версия алгоритма для быстрого выбора лидера на основе версии1, но авторизованный;3— версия алгоритма для быстрого выбора лидера на основе TCP.
Версия
3выбирается по умолчанию, версии0,1и2носят статус устаревших и будут удалены в следующем релизе.Пример
electionAlg=3leaderServes— опция разрешает клиентам подключаться к серверу-лидеру. Значение по умолчанию —yes. Одна из основных задач сервера-лидера — следить за обновлениями и координировать. Отключением этой опции (установкой значенияno) можно добиться некоторого прироста производительности при координации обновлений.Пример
leaderServes=yesСвойства, используемые совместно:
group.x=nnnn[:nnnn]— позволяет построить кворум с иерархической структурой, гдеx— это идентификатор кворума, а в качестве значения указывается список идентификаторов серверов, разделенных двоеточием.Примечание
Группы не должны пересекаться, т. е. объединение всех групп должно давать полный список всех серверов.
weight.x=nnnnn— при использовании вместе сgroup.x=nnnn[:nnnn]позволяет назначить «вес» каждому серверу, который будет учитываться при выборе лидера. По умолчанию вес равен1.
Пример
group.1=1:2:3 group.2=4:5:6 group.3=7:8:9 weight.1=1 weight.2=1 weight.3=1 weight.4=1 weight.5=1 weight.6=1 weight.7=1 weight.8=1 weight.9=1
Следует учитывать, что запуск всех узлов кластера должен происходить за промежуток времени, не превышающий 10 тиков (heartbeat), заданных в строке initLimit=10. Поэтому лучше создать скрипт последовательного запуска всех узлов кластера с одной машины.
Запуск каждого узла Zookeeper выполняется командой:
bin/zookeeper-server-start.sh -daemon
config/zookeeper.properties
Или с параллельным выводом файла логов в консоль:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail --f logs/zookeeper.out
Запуск всех узлов Zookeeper с одного сервера выполняется специальным скриптом:
bin/zookeeper-cluster-start.sh
config/zookeeper-cluster.properties
Где config/zookeeper-cluster.properties — файл конфигураций кластера с IP-адресами всех серверов, с которых производится запуск, а также пользователей, путей к дистрибутиву Corax и имен файлов конфигураций Zookeeper-брокеров. Формат файла приведен в таблице ниже.
IP-адрес сервера |
Имя пользователя |
Путь к дистрибутиву Corax |
Файл конфигураций Zookeeper |
|---|---|---|---|
yyy.yyy.93.87 |
mon99usr |
kafka\2.11-0.9.0.0/ |
zookeeper.properties |
yyy.yyy.93.88 |
mon99usr |
kafka\2.11-0.9.0.0/ |
zookeeper.properties |
yyy.yyy.97.221 |
mon99usr |
kafka\2.11-0.9.0.0/ |
zookeeper.properties |
Проверка работоспособности#
При запуске всех узлов кластера Zookeeper в файлах logs/zookeeper.out должны быть следующие записи:
В файлах
logs/zookeeper.outсерверов-последователей:INFO FOLLOWING - LEADER ELECTION TOOK - 847 (org.apache.zookeeper.server.quorum.Learner)В файле
logs/zookeeper.outсервера-лидера:INFO LEADING - LEADER ELECTION TOOK - 408 (org.apache.zookeeper.server.quorum.Leader)
Также в консоли сервера-лидера должна быть запись, информирующая о создании кворума в кластере с перечислением ID узлов:
INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting
last processed zxid: 0x100000000
(org.apache.zookeeper.server.quorum.Leader)
Тестовый запуск кластера Zookeeper (2 узла)#
Тестовый запуск кластера Zookeeper из двух узлов производится на двух удаленных серверах:
Server-1: yyy.yyy.93.87
Server-2: yyy.yyy.93.88
Дополнительная информация:
версия дистрибутива Corax:
kafka\2.11-0.9.0.0;версия Zookeeper:
zookeeper-3.4.6;имя пользователя:
mon99usr;\$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где\$KAFKA_HOME— корневая директория дистрибутива Corax.
Для тестового запуска кластера Zookeeper выполните следующие действия:
На каждом из серверов создайте директорию, где будут храниться данные Zookeeper:
mkdir --p /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/На каждом из серверов создайте файл
myidв директории/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/, созданной на предыдущем шаге (1). Запишите в файл соответствующий уникальный идентификатор узла Zookeeper:на сервере
Server-1в файлmyidзапишите1:echo 1 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 1на сервере
Server-2в файлmyidзапишите2:echo 2 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 2
Внесите следующие настройки в файл конфигурации
\$KAFKA_HOME/config/zookeeper.configдля обоих серверов:server.1=yyy.yyy.93.87:2888:3888 server.2=yyy.yyy.93.88:2888:3888 clientPort=3181 dataDir=/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/ maxClientCnxns=60 tickTime=2000 initLimit=10 syncLimit=5Где:
dataDir— директория для хранения данных Zookeeper, которая была создана на шаге (1).server.1— адресServer-1; число1вserver.1— этоmyidсервераServer-1;server.2— адресServer-2; число2вserver.2— этоmyidсервераServer-2.
Запустите кластер Zookeeper из двух узлов:
Выполните запуск узла Zookeeper на
Server-1:bin/zookeeper-server-start.sh --daemon config/zookeeper.config && tail --f logs/zookeeper.out [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0] bin/zookeeper-server-start.sh config/zookeeper.properties [YYYY-MM-DD 15:25:19,172] INFO Reading configuration from: config/zookeeper.properties [YYYY-MM-DD 15:25:19,174] WARN No server failure will be tolerated. You need at least 3 servers [YYYY-MM-DD 15:25:19,174] INFO Defaulting to majority quorums [YYYY-MM-DD 15:25:19,179] INFO autopurge.snapRetainCount set to 3 [YYYY-MM-DD 15:25:19,179] INFO autopurge.purgeInterval set to 0 [YYYY-MM-DD 15:25:19,179] INFO Purge task is not scheduled. [YYYY-MM-DD 15:25:19,198] INFO Starting quorum peer [YYYY-MM-DD 15:25:19,210] INFO binding to port 0.0.0.0/0.0.0.0:3181 [YYYY-MM-DD 15:25:19,234] INFO tickTime set to 2000 [YYYY-MM-DD 15:25:19,234] INFO minSessionTimeout set to -1 [YYYY-MM-DD 15:25:19,234] INFO maxSessionTimeout set to -1 [YYYY-MM-DD 15:25:19,234] INFO initLimit set to 10 [YYYY-MM-DD 15:25:19,247] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001 [YYYY-MM-DD 15:25:19,260] INFO My election bind port: yyy.yyy.93.87:3888 [YYYY-MM-DD 15:25:19,270] INFO New election. My id = 1, proposed zxid=0x700000001 [YYYY-MM-DD 15:25:19,272] INFO Notification: 1 (message format version), 1 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection) [YYYY-MM-DD 15:25:19,284] WARN Cannot open channel to 2 at election address /yyy.yyy.93.88:3888 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) ... [YYYY-MM-DD 15:25:19,892] INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection) [YYYY-MM-DD 15:25:20,172] INFO Received connection request /yyy.yyy.93.88:45491 (org.apache.zookeeper.server.quorum.QuorumCnxManager) [YYYY-MM-DD 15:25:20,379] INFO FOLLOWING [YYYY-MM-DD 15:25:20,384] INFO TCP NoDelay set to: true [YYYY-MM-DD 15:25:20,390] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [YYYY-MM-DD 15:25:20,390] INFO Server environment:host.name=SBT-IPO-203.youre.adress.ru [YYYY-MM-DD 15:25:20,390] INFO Server environment:java.version=1.8.0_60 [YYYY-MM-DD 15:25:20,390] INFO Server environment:java.home=/usr/java/jre1.8.0_60 [YYYY-MM-DD 15:25:20,390] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/.. [YYYY-MM-DD 15:25:20,391] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [YYYY-MM-DD 15:25:20,391] INFO Server environment:java.io.tmpdir=/tmp [YYYY-MM-DD 15:25:20,391] INFO Server environment:java.compiler=\<NA\> [YYYY-MM-DD 15:25:20,391] INFO Server environment:os.name=Linux [YYYY-MM-DD 15:25:20,391] INFO Server environment:os.arch=amd64 [YYYY-MM-DD 15:25:20,391] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [YYYY-MM-DD 15:25:20,391] INFO Server environment:user.name=mon99usr [YYYY-MM-DD 15:25:20,391] INFO Server environment:user.home=/home/mon99usr [YYYY-MM-DD 15:25:20,391] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [YYYY-MM-DD 15:25:20,393] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [YYYY-MM-DD 15:25:20,394] INFO FOLLOWING - LEADER ELECTION TOOK - 1124 (org.apache.zookeeper.server.quorum.Learner) [YYYY-MM-DD 15:25:20,421] INFO Getting a diff from the leader 0x700000001 (org.apache.zookeeper.server.quorum.Learner) [YYYY-MM-DD 15:25:20,425] INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001Запуск выполнен успешно.
Выполните запуск узла Zookeeper на
Server-2:bin/zookeeper-server-start.sh --daemon config/zookeeper.config && tail --f logs/zookeeper.out [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh config/zookeeper.properties [YYYY-MM-DD 15:25:19,874] INFO Reading configuration from: config/zookeeper.properties [YYYY-MM-DD 15:25:19,878] WARN No server failure will be tolerated. You need at least 3 servers. [YYYY-MM-DD 15:25:19,878] INFO Defaulting to majority quorums [YYYY-MM-DD 15:25:19,889] INFO autopurge.snapRetainCount set to 3 [YYYY-MM-DD 15:25:19,889] INFO autopurge.purgeInterval set to 0 [YYYY-MM-DD 15:25:19,889] INFO Purge task is not scheduled. [YYYY-MM-DD 15:25:19,941] INFO Starting quorum peer [YYYY-MM-DD 15:25:19,971] INFO binding to port 0.0.0.0/0.0.0.0:3181 [YYYY-MM-DD 15:25:20,037] INFO tickTime set to 2000 [YYYY-MM-DD 15:25:20,037] INFO minSessionTimeout set to -1 [YYYY-MM-DD 15:25:20,037] INFO maxSessionTimeout set to -1 [YYYY-MM-DD 15:25:20,038] INFO initLimit set to 10 [YYYY-MM-DD 15:25:20,128] INFO My election bind port: /yyy.yyy.93.88:3888 [YYYY-MM-DD 15:25:20,152] INFO LOOKING [YYYY-MM-DD 15:25:20,155] INFO New election. My id = 2, proposed zxid=0x700000001 [YYYY-MM-DD 15:25:20,176] INFO Notification: 1 (message format version), 2 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection) ... [YYYY-MM-DD 15:25:20,381] INFO LEADING [YYYY-MM-DD 15:25:20,386] INFO TCP NoDelay set to: true [YYYY-MM-DD 15:25:20,395] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [YYYY-MM-DD 15:25:20,395] INFO Server environment:host.name=SBT-IPO-204.youre.adress.ru [YYYY-MM-DD 15:25:20,395] INFO Server environment:java.version=1.8.0_66 [YYYY-MM-DD 15:25:20,395] INFO Server environment:java.home=/usr/java/jre1.8.0_66 [YYYY-MM-DD 15:25:20,395] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../libs/scala- (org.apache.zookeeper.server.ZooKeeperServer) [YYYY-MM-DD 15:25:20,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [YYYY-MM-DD 15:25:20,396] INFO Server environment:java.io.tmpdir=/tmp [YYYY-MM-DD 15:25:20,396] INFO Server environment:java.compiler=\<NA\> [YYYY-MM-DD 15:25:20,396] INFO Server environment:os.name=Linux [YYYY-MM-DD 15:25:20,396] INFO Server environment:os.arch=amd64 [YYYY-MM-DD 15:25:20,396] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [YYYY-MM-DD 15:25:20,396] INFO Server environment:user.name=mon99usr [YYYY-MM-DD 15:25:20,396] INFO Server environment:user.home=/home/mon99usr [YYYY-MM-DD 15:25:20,396] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [YYYY-MM-DD 15:25:20,397] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 (org.apache.zookeeper.server.ZooKeeperServer) [YYYY-MM-DD 15:25:20,398] INFO LEADING - LEADER ELECTION TOOK - 243 (org.apache.zookeeper.server.quorum.Leader) [YYYY-MM-DD 15:25:20,410] INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer\@64c4b35a [YYYY-MM-DD 15:25:20,419] INFO Synchronizing with Follower sid: 1 maxCommittedLog=0x700000001 minCommittedLog=0x300000001 peerLastZxid=0x700000001 [YYYY-MM-DD 15:25:20,419] INFO Sending DIFF [YYYY-MM-DD 15:25:20,431] INFO Received NEWLEADER-ACK message from 1 [YYYY-MM-DD 15:25:20,433] INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting last processed zxid: 0x900000000 (org.apache.zookeeper.server.quorum.Leader)Запуск выполнен успешно.
Пояснения к логам Zookeeper на сервере Server-1:
WARN No server failure will be tolerated. You need at least 3 serversВ случае отказа одного из серверов Zookeeper сервис Zookeeper не сможет продолжить свою работу, т. к. нарушено правило кворума о строгом большинстве доступных узлов. Т. е. число работоспособных узлов в Zookeeper-кластере всегда должно быть больше числа неработоспособных узлов. В данном случае узла два и отказ одного из узлов приведет к неработоспособности сервиса.
WARN Cannot open channel to 2 at election address /yyy.yyy.93.88:3888 java.net.ConnectException: Connection refusedОтказ при попытке соединения с
Server-2, т. к.Server-2в данный момент еще не был запущен.INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection)Тайм-аут при попытке выбрать лидера.
INFO Received connection request /yyy.yyy.93.88:45491Получен ответ на запрос о соединении с сервером
Server-2.INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2Описание конфигурации созданного сервера.
INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001Сохранение образа текущего состояния Zookeeper.
INFO FOLLOWINGПосле успешного соединения
Server-1сServer-2:Server-1становится сервером-последователем;Server-2становится сервером-лидером.
FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn\'t match stored brokerId 1 in meta.propertiesДанная ошибка запуска Zookeeper на сервере
Server-2происходит по причине конфликта в служебный файл Zookeepermeta.propertiesзаписался неверный идентификатор брокераbroker.id=1, в то время как идентификаторmyidсервераServer-2указан, как равный двум. Решением проблемы будет удаление файлаmeta.properties, тогда при новом запуске Zookeeper этот файл будет сгенерирован автоматически и с корректными настройками.
Пояснения к логам Zookeeper на сервере Server-2:
INFO LOOKINGINFO New election. My id = 2, proposed zxid=0x700000001...INFO LEADING -- LEADING ELECTION TOOK -- 250Поиск сервера-лидера (т. к. сервер-лидер еще не выбран), голосование на выбор сервера-лидера (каждый из серверов предлагает себя), сервером-лидером выбран
Server-2, аServer-1автоматически становится сервером-последователем.WARN Cannot open channel to 2 at election address /yyy.yyy.93.88:3888(org.apache.zookeeper.server.quorum.QuorumCnxManager)java.net.ConnectException: Connection refusedПопытка подключиться к отключенному серверу для выбора лидера. Сообщение возникает, если отключить
Server-1илиServer-2во время работы обоих серверов.
Тестовый запуск кластера Zookeeper (3 узла)#
Тестовый запуск кластера Zookeeper из трех узлов и кластера Corax из трех узлов будет производиться на трех удаленных серверах:
Server-1: yyy.yyy.93.87 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]
Server-2: yyy.yyy.93.88 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]
Server-3: yyy.yyy.97.221 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]
Дополнительная информация:
версия дистрибутива Corax: kafka\2.11-0.9.0.0;
версия Zookeeper: zookeeper-3.4.6;
имя пользователя:
mon99usr;\$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где\$KAFKA_HOME— домашняя корневая директория дистрибутива Corax.
Конфигурация кластера Zookeeper из трех узлов
Каждый узел располагается на отдельном сервере.
Для конфигурации кластера Zookeeper выполните следующие действия:
На каждом из серверов создайте директорию, где будут храниться данные Zookeeper:
mkdir --p /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/На каждом из серверов создайте файл
myidв директории/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/, созданной на предыдущем шаге (1). Это уникальный идентификатор каждого узла Zookeeper, запишите его в файл:на сервере
Server-1в файлmyidзапишите1:echo 1 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 1на сервере
Server-2в файлmyidзапишите2:echo 2 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 2на сервере
Server-3в файлmyidзапишите3:echo 3 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 3
Внесите следующие настройки в файл конфигурации.
\$KAFKA_HOME/config/zookeeper.configдля трех серверов одинаков:server.1=yyy.yyy.93.87:2888:3888 server.2=yyy.yyy.93.88:2888:3888 server.3=yyy.yyy.97.221:2888:3888 clientPort=3181 dataDir=/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/ tickTime=2000 initLimit=10 syncLimit=5 maxClientCnxns=60Где:
dataDir— директория для хранения данных Zookeeper, которая была создана на шаге (1);server.1— адресServer-1; число1вserver.1— этоmyidсервераServer-1;server.2— адресServer-2; число2вserver.2— этоmyidсервераServer-2;server.3— адресServer-3; число3вServer.3— этоmyidсервераServer-3.
Запуск кластера Zookeeper из трех узлов
Выполните запуск узла Zookeeper на сервере
Server-1в режиме демона с выводом логов:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [YYYY-MM-DD 12:56:43,375] INFO Reading configuration from: config/zookeeper.properties [YYYY-MM-DD 12:56:43,377] INFO Defaulting to majority quorums [YYYY-MM-DD 12:56:43,382] INFO autopurge.snapRetainCount set to 3 [YYYY-MM-DD 12:56:43,382] INFO autopurge.purgeInterval set to 0 [YYYY-MM-DD 12:56:43,382] INFO Purge task is not scheduled. [YYYY-MM-DD 12:56:43,411] INFO Starting quorum peer [YYYY-MM-DD 12:56:43,424] INFO binding to port 0.0.0.0/0.0.0.0:3181 [YYYY-MM-DD 12:56:43,447] INFO tickTime set to 2000 [YYYY-MM-DD 12:56:43,448] INFO minSessionTimeout set to -1 [YYYY-MM-DD 12:56:43,448] INFO maxSessionTimeout set to -1 [YYYY-MM-DD 12:56:43,448] INFO initLimit set to 10 [YYYY-MM-DD 12:56:43,467] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac [YYYY-MM-DD 12:56:43,507] INFO My election bind port: /yyy.yyy.93.87:3888 [YYYY-MM-DD 12:56:43,518] INFO LOOKING [YYYY-MM-DD 12:56:43,520] INFO New election. My id = 1, proposed zxid=0x310000003d [YYYY-MM-DD 12:56:43,522] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:56:43,530] WARN Cannot open channel to 2 at election address /yyy.yyy.93.88:3888 java.net.ConnectException: Connection refused [YYYY-MM-DD 12:56:43,534] WARN Cannot open channel to 3 at election address /yyy.yyy.97.221:3888 java.net.ConnectException: Connection refused [YYYY-MM-DD 12:56:56,388] INFO Received connection request /yyy.yyy.93.88:11287 (org.apache.zookeeper.server.quorum.QuorumCnxManager) [YYYY-MM-DD 12:56:56,595] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer) [YYYY-MM-DD 12:56:56,600] INFO TCP NoDelay set to: true [YYYY-MM-DD 12:56:56,606] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [YYYY-MM-DD 12:56:56,607] INFO Server environment:host.name=SBT-IPO-203.youre.adress.ru [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.version=1.8.0_60 [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.home=/usr/java/jre1.8.0_60 [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../.... [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.library.path=/usr/java/packages/... [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.io.tmpdir=/tmp [YYYY-MM-DD 12:56:56,607] INFO Server environment:java.compiler=\<NA\> [YYYY-MM-DD 12:56:56,607] INFO Server environment:os.name=Linux [YYYY-MM-DD 12:56:56,607] INFO Server environment:os.arch=amd64 [YYYY-MM-DD 12:56:56,607] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [YYYY-MM-DD 12:56:56,607] INFO Server environment:user.name=mon99usr [YYYY-MM-DD 12:56:56,607] INFO Server environment:user.home=/home/mon99usr [YYYY-MM-DD 12:56:56,607] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [YYYY-MM-DD 12:56:56,609] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 Datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [YYYY-MM-DD 12:56:56,609] INFO FOLLOWING - LEADER ELECTION TOOK - 13090 (org.apache.zookeeper.server.quorum.Learner) [YYYY-MM-DD 12:56:56,616] WARN Unexpected exception, tries=0, connecting to /yyy.yyy.93.88:2888 java.net.ConnectException: Connection refused ... [YYYY-MM-DD 12:56:57,640] INFO Getting a diff from the leader 0x310000003d [YYYY-MM-DD 12:56:57,645] INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d [YYYY-MM-DD 12:57:04,004] WARN Got zxid 0x3200000001 expected 0x1 [YYYY-MM-DD 12:57:04,004] INFO Creating new log file: log.3200000001 [YYYY-MM-DD 12:57:06,472] INFO Received connection request /yyy.yyy.97.221:47171 [YYYY-MM-DD 12:57:06,475] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) FOLLOWING (my state)Где
tail -f logs/zookeeper.out— вывод текущего лога Zookeeper-брокера из файла в реальном времени.Запуск Zookeeper-брокера на
Server-1выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере yyy.yyy.93.88 и с Брокером на сервере yyy.yyy.97.221, т. к. Брокеры на этих серверах еще не запущены:
INFO LOOKING WARN Cannot open channel to 2 at election address /yyy.yyy.93.88:3888 java.net.ConnectException: Connection refused WARN Cannot open channel to 3 at election address /yyy.yyy.97.221:3888 java.net.ConnectException: Connection refusedПринят запрос на соединение от Брокера на сервере
Server-2по адресу yyy.yyy.93.88 в связи со запуском этого Брокера:INFO Received connection request /yyy.yyy.93.88:11287Как правило, после этого сообщения новый Брокер включается в кластер.
Выбран сервер-лидер — это только что запущенный Брокер на сервере
Server-2. Брокер на текущем сервереServer-1выбран в качестве сервера-последователя (FOLLOWER):INFO FOLLOWINGСинхронизация Брокера-последователя на сервере
Server-1с Брокером-лидером на сервереServer-2: получение от сервера-лидераServer-2текущего состояния системы (diff). В результате на сервереServer-1будет создан и сохранен соответствующий образ (snapshot) только что полученных данных:INFO Getting a diff from the leader 0x310000003d INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003dПринят запрос на соединение от Брокера на сервере
Server-3по адресу yyy.yyy.97.221 в связи с запуском этого Брокера:INFO Received connection request /yyy.yyy.97.221:11287С этого момента новый Брокер включается в кластер.
После сообщения
INFO Notification: 1...вывод логов прекращается в связи с отсутствием дальнейших событий.
Выполните запуск узла Zookeeper на сервере
Server-2в режиме демона с выводом логов:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [YYYY-MM-DD 12:56:56,055] INFO Reading configuration from: config/zookeeper.properties [YYYY-MM-DD 12:56:56,058] INFO Defaulting to majority quorums [YYYY-MM-DD 12:56:56,067] INFO autopurge.snapRetainCount set to 3 [YYYY-MM-DD 12:56:56,067] INFO autopurge.purgeInterval set to 0 [YYYY-MM-DD 12:56:56,067] INFO Purge task is not scheduled. [YYYY-MM-DD 12:56:56,116] INFO Starting quorum peer [YYYY-MM-DD 12:56:56,135] INFO binding to port 0.0.0.0/0.0.0.0:3181 [YYYY-MM-DD 12:56:56,184] INFO tickTime set to 2000 [YYYY-MM-DD 12:56:56,185] INFO minSessionTimeout set to -1 [YYYY-MM-DD 12:56:56,185] INFO maxSessionTimeout set to -1 [YYYY-MM-DD 12:56:56,185] INFO initLimit set to 10 [YYYY-MM-DD 12:56:56,211] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.2a000000a9 [YYYY-MM-DD 12:56:56,375] INFO My election bind port: /yyy.yyy.93.88:3888 [YYYY-MM-DD 12:56:56,378] INFO LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer) [YYYY-MM-DD 12:56:56,380] INFO New election. My id = 2, proposed zxid=0x310000003d [YYYY-MM-DD 12:56:56,394] INFO Notification: 1 (message format version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:56:56,396] WARN Cannot open channel to 3 at election address /yyy.yyy.97.221:3888 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) ... [YYYY-MM-DD 12:56:56,398] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:56:56,399] INFO Notification: 1 (message format version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:56:56,601] INFO LEADING [YYYY-MM-DD 12:56:56,605] INFO TCP NoDelay set to: true [YYYY-MM-DD 12:56:56,615] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [YYYY-MM-DD 12:56:56,615] INFO Server environment:host.name=SBT-IPO-204.youre.adress.ru [YYYY-MM-DD 12:56:56,615] INFO Server environment:java.version=1.8.0_66 [YYYY-MM-DD 12:56:56,615] INFO Server environment:java.home=/usr/java/jre1.8.0_66 [YYYY-MM-DD 12:56:56,615] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/.... [YYYY-MM-DD 12:56:56,616] INFO Server environment:java.library.path=/usr/java/... [YYYY-MM-DD 12:56:56,616] INFO Server environment:java.io.tmpdir=/tmp [YYYY-MM-DD 12:56:56,616] INFO Server environment:java.compiler=\<NA\> [YYYY-MM-DD 12:56:56,616] INFO Server environment:os.name=Linux [YYYY-MM-DD 12:56:56,616] INFO Server environment:os.arch=amd64 [YYYY-MM-DD 12:56:56,616] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [YYYY-MM-DD 12:56:56,616] INFO Server environment:user.name=mon99usr [YYYY-MM-DD 12:56:56,616] INFO Server environment:user.home=/home/mon99usr [YYYY-MM-DD 12:56:56,616] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [YYYY-MM-DD 12:56:56,618] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [YYYY-MM-DD 12:56:56,619] INFO LEADING - LEADER ELECTION TOOK - 239 [YYYY-MM-DD 12:56:57,627] INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$... [YYYY-MM-DD 12:56:57,638] INFO Synchronizing with Follower sid: 1 maxCommittedLog=0x310000003d minCommittedLog=0x2e0000006f peerLastZxid=0x310000003d [YYYY-MM-DD 12:56:57,638] INFO Sending DIFF [YYYY-MM-DD 12:56:57,666] INFO Received NEWLEADER-ACK message from 1 [YYYY-MM-DD 12:56:57,669] INFO Have quorum of supporters, sids: [ 1,2]; starting up and setting last processed zxid: .. [YYYY-MM-DD 12:57:04,000] INFO Expiring session 0x253bc816e870000, timeout of 6000ms exceeded [YYYY-MM-DD 12:57:04,001] INFO Processed session termination for sessionid: 0x253bc816e870000 [YYYY-MM-DD 12:57:04,002] INFO Creating new log file: log.3200000001 [YYYY-MM-DD 12:57:06,475] INFO Received connection request /yyy.yyy.97.221:34164 [YYYY-MM-DD 12:57:06,477] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) LEADING (my state) [YYYY-MM-DD 12:57:06,513] INFO Follower sid: 3 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer@... [YYYY-MM-DD 12:57:06,517] INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d [YYYY-MM-DD 12:57:06,518] INFO Sending DIFF [YYYY-MM-DD 12:57:06,540] INFO Received NEWLEADER-ACK message from 3Запуск брокера на сервере
Server-2выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере yyy.yyy.97.221, т. к. Брокер на этом сервере еще не запущен:
INFO LOOKING WARN Cannot open channel to 3 at election address /yyy.yyy.97.221:3888 java.net.ConnectException: Connection refusedЛидером выбран Брокер на сервере
Server-2(на выбор лидера ушло 239 миллисекунд). От Брокера на сервереServer-1получено соответствующее подтверждение. Выведено сообщение, что создан новый кворум [1,2] из двух источников-брокеров с идентификаторамиmyid=1и2соответственно.INFO LEADING INFO LEADING - LEADER ELECTION TOOK -- 239 INFO Have quorum of supporters, sids: [ 1,2 ];Синхронизация Брокера-последователя на сервере
Server-1с Брокером-лидером на сервереServer-2. Брокер-лидер отправляет текущее состояние системы (DIFF) и тем самым запускает синхронизацию. После обработкиDIFFБрокером-последователем (сохраненияDIFFв виде образа (snapshot) текущего состояния системы) от Брокера-последователя поступает соответствующее подтверждение о полученииReceived NEWLEADER-ACK:INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.Q... INFO Synchronizing with Follower sid: 1 INFO Sending DIFF INFO Received NEWLEADER-ACK message from 1Принят запрос на соединение от нового Брокера на сервере
Server-3по адресу yyy.yyy.97.221 в связи со запуском этого Брокера:INFO Received connection request /yyy.yyy.97.221:34164Новый Брокер с идентификатором
myid=3принят в кворум в качестве Брокера-последователя:INFO Follower sid: 3Синхронизация Брокера-последователя на сервере
Server-3с Брокером-лидером на сервереServer-2. Брокеру-последователю отправлено текущее состояние системы (DIFF), успешно обработано Брокером-последователем, в ответ выслано соответствующее подтверждение:INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d [YYYY-MM-DD 12:57:06,518] INFO Sending DIFF [YYYY-MM-DD 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
Выполните запуск узла Zookeeper на сервере
Server-3в режиме демона с выводом логов:> bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [YYYY-MM-DD 12:57:07,273] INFO Reading configuration from: config/zookeeper.properties [YYYY-MM-DD 12:57:07,276] INFO Defaulting to majority quorums [YYYY-MM-DD 12:57:07,282] INFO autopurge.snapRetainCount set to 3 [YYYY-MM-DD 12:57:07,282] INFO autopurge.purgeInterval set to 0 [YYYY-MM-DD 12:57:07,282] INFO Purge task is not scheduled. [YYYY-MM-DD 12:57:07,303] INFO Starting quorum peer [YYYY-MM-DD 12:57:07,321] INFO binding to port 0.0.0.0/0.0.0.0:3181 [YYYY-MM-DD 12:57:07,357] INFO tickTime set to 2000 [YYYY-MM-DD 12:57:07,357] INFO minSessionTimeout set to -1 [YYYY-MM-DD 12:57:07,357] INFO maxSessionTimeout set to -1 [YYYY-MM-DD 12:57:07,357] INFO initLimit set to 10 [YYYY-MM-DD 12:57:07,378] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac [YYYY-MM-DD 12:57:07,430] INFO My election bind port: /yyy.yyy.97.221:3888 [YYYY-MM-DD 12:57:07,444] INFO LOOKING [YYYY-MM-DD 12:57:07,445] INFO New election. My id = 3, proposed zxid=0x31000003d [YYYY-MM-DD 12:57:07,459] INFO Notification: 1 (message format version), 3 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:57:07,460] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:57:07,460] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), FOLLOWING (n.state), 1 (n.sid), 0x2 (n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:57:07,461] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:57:07,462] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LEADING (n.state), 2 (n.sid), 0x32(n.peerEpoch) LOOKING (my state) [YYYY-MM-DD 12:57:07,462] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer) [YYYY-MM-DD 12:57:07,469] INFO TCP NoDelay set to: true [YYYY-MM-DD 12:57:07,478] INFO Server environment:zookeeper.version=3.4.6-156995, built on 02/20/2014 09:09 GMT [YYYY-MM-DD 12:57:07,478] INFO Server environment:host.name=SBT-IPO-208.youre.adress.ru [YYYY-MM-DD 12:57:07,487] INFO Server environment:java.version=1.8.0_71 [YYYY-MM-DD 12:57:07,487] INFO Server environment:java.home=/usr/java/jre1.8.0_1 [YYYY-MM-DD 12:57:07,487] INFO Server environment:java.class.path=:/home/mon99ur/... [YYYY-MM-DD 12:57:07,488] INFO Server environment:java.library.path=/usr/java/packages/... [YYYY-MM-DD 12:57:07,488] INFO Server environment:java.io.tmpdir=/tmp [YYYY-MM-DD 12:57:07,488] INFO Server environment:java.compiler=\<NA\> [YYYY-MM-DD 12:57:07,488] INFO Server environment:os.name=Linux [YYYY-MM-DD 12:57:07,488] INFO Server environment:os.arch=amd64 [YYYY-MM-DD 12:57:07,488] INFO Server environment:os.version=3.10.0-123.9.3.el7x86_64 [YYYY-MM-DD 12:57:07,488] INFO Server environment:user.name=mon99usr [YYYY-MM-DD 12:57:07,488] INFO Server environment:user.home=/home/mon99usr [YYYY-MM-DD 12:57:07,488] INFO Server environment:user.dir=/home/mon99usr/kafka2.11-0.9.0.0 [YYYY-MM-DD 12:57:07,490] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [YYYY-MM-DD 12:57:07,491] INFO FOLLOWING - LEADER ELECTION TOOK - 46 [YYYY-MM-DD 12:57:07,502] INFO Getting a diff from the leader 0x3200000001 [YYYY-MM-DD 12:57:07,506] WARN Got zxid 0x3200000001 expected 0x1 [YYYY-MM-DD 12:57:07,508] INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001Запуск брокера на сервере
Server-3выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, инициация алгоритма выбора лидера, предложение своей кандидатуры в качестве лидера аналогичны инициализации двух предыдущих Брокеров:
INFO LOOKING INFO New election. My id = 3, proposed zxid=0x31000003dБрокер на сервере
Server-3выбран в качестве Брокера-последователя:INFO FOLLOWING INFO FOLLOWING - LEADER ELECTION TOOK -- 46Синхронизация с Брокером на сервере
Server-2. Получение состояния системы и сохранение образа:INFO Getting a diff from the leader 0x3200000001 INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
Конфигурация кластера Corax из трех узлов
Каждый узел расположен на отдельном сервере.
В целях тестирования доступности сервиса Zookeeper с различным количеством работающих узлов сконфигурируйте кластер Corax:
На каждом из серверов создайте директорию, где будут храниться логи Corax:
mkdir --p /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/Создайте файл конфигурации
\$KAFKA_HOME/config/server.configдля каждого Corax-брокера.Для сервера
Server-1:broker.id=1 host.name=yyy.yyy.93.87 log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs zookeeper.connect=yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181Для сервера
Server-2:broker.id=2 host.name=yyy.yyy.93.88 log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs zookeeper.connect=yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181Для сервера
Server-3:broker.id=3 host.name=yyy.yyy.97.221 log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs zookeeper.connect=yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181
Где:
log.dirs— директория, где хранятся логи Corax, которая была создана на шаге (1);host.name— IP-адрес сервера, на котором находится Corax-брокер;broker.id— идентификатор Corax-брокера, обязательно должен быть уникальным в рамках кластера Corax;zookeeper.connect— список адресов Zookeeper-брокеров в форматеIP:PORT, где:IP— адрес сервера, на котором выполняется Zookeeper-брокер;PORT— порт, к которому подключаются клиенты Zookeeper — егоclientPort.
Примечание
Свойство
portне указано в файлах, т. к. нет необходимости прописывать его явно:port=9092используется Corax по умолчанию.При необходимости значение порта можно прописать явно или изменить на другое.
Запуск кластера Corax из трех узлов
Выполните запуск узла Corax на сервере
Server-1в режиме демона с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [YYYY-MM-DD 17:16:50,514] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 11 milliseconds. [YYYY-MM-DD 17:16:50,531] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:16:50,532] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:16:50,537] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [YYYY-MM-DD 17:16:50,549] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:16:50,555] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:16:50,556] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(yyy.yyy.93.87,9092,PLAINTEXT) (kafka.utils.ZkUtils) [YYYY-MM-DD 17:16:50,571] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [YYYY-MM-DD 17:17:06,309] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = false quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = false offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.9.0.X log.retention.hours = 168 num.partitions = 1 listeners = null ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 524288000 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = null controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = yyy.yyy.93.87 log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 30000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /var/tmp/kafka_2.11-0.9.0.0/kafka-logs controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 3 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = null replica.fetch.min.bytes = 1 unclean.leader.election.enable = true group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 1 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig) [YYYY-MM-DD 17:17:06,377] INFO starting (kafka.server.KafkaServer) [YYYY-MM-DD 17:17:06,382] INFO Connecting to zookeeper on yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181 (kafka.server.KafkaServer) [YYYY-MM-DD 17:17:06,748] INFO Loading logs. (kafka.log.LogManager) [YYYY-MM-DD 17:17:06,986] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-0/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log) [YYYY-MM-DD 17:17:06,987] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log) [YYYY-MM-DD 17:17:06,988] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log) [YYYY-MM-DD 17:17:06,990] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-1/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log) [YYYY-MM-DD 17:17:06,991] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log) [YYYY-MM-DD 17:17:06,992] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log) [YYYY-MM-DD 17:17:06,994] INFO Logs loading complete. (kafka.log.LogManager) [YYYY-MM-DD 17:17:06,995] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:17:06,996] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:17:07,040] INFO Awaiting socket connections on SBT-IPO-203.youre.adress.ru:9092. (kafka.network.Acceptor) [YYYY-MM-DD 17:17:07,043] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer) [YYYY-MM-DD 17:17:07,062] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:17:07,063] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:17:07,124] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:17:07,142] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:17:07,142] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [YYYY-MM-DD 17:17:09,020] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [YYYY-MM-DD 17:17:09,051] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:17:09,053] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:17:09,053] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:17:09,057] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:17:09,060] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.GroupMetadataManager) [YYYY-MM-DD 17:17:09,075] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:17:09,084] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [YYYY-MM-DD 17:17:09,128] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:17:09,138] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:17:09,139] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -\> EndPoint(yyy.yyy.93.87,9092,PLAINTEXT) (kafka.utils.ZkUtils) [YYYY-MM-DD 17:17:09,151] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [YYYY-MM-DD 17:17:09,409] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [NUMBERS,0],[NUMBERS,1] (kafka.server.ReplicaFetcherManager) [YYYY-MM-DD 17:17:10,584] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions[NUMBERS,0] (kafka.server.ReplicaFetcherManager)Где
tail -f logs/server.log— вывод текущего лога Corax-брокера из файла в реальном времени.Запуск Corax-брокера на сервере
Server-1выполнен успешно.Выполните запуск узла Zookeeper на сервере
Server-2в режиме демона с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [YYYY-MM-DD 17:35:35,726] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,36] [YYYY-MM-DD 17:35:35,728] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,36] in 2 milliseconds. [YYYY-MM-DD 17:36:11,334] INFO KafkaConfig values: .... (kafka.server.KafkaConfig) [YYYY-MM-DD 17:36:11,441] INFO starting (kafka.server.KafkaServer) [YYYY-MM-DD 17:36:11,450] INFO Connecting to zookeeper on yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181 (kafka.server.KafkaServer) [YYYY-MM-DD 17:36:11,870] INFO Loading logs. (kafka.log.LogManager) [YYYY-MM-DD 17:36:12,775] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-1/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log) [YYYY-MM-DD 17:36:12,776] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log) [YYYY-MM-DD 17:36:12,777] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log) [YYYY-MM-DD 17:36:12,780] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-0/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log) [YYYY-MM-DD 17:36:12,781] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log) [YYYY-MM-DD 17:36:12,782] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log) [YYYY-MM-DD 17:36:12,785] INFO Logs loading complete. (kafka.log.LogManager) [YYYY-MM-DD 17:36:12,786] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:36:12,799] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:36:12,858] INFO Awaiting socket connections on SBT-IPO-204.youre.adress.ru:9092. (kafka.network.Acceptor) [YYYY-MM-DD 17:36:12,861] INFO [Socket Server on Broker 2], Started 1 acceptor threads (kafka.network.SocketServer) [YYYY-MM-DD 17:36:13,054] INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:36:13,064] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:36:13,064] INFO [GroupCoordinator 2]: Startup complete. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:36:13,073] INFO [Group Metadata Manager on Broker 2]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager) [YYYY-MM-DD 17:36:13,075] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:36:13,087] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:36:13,088] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:36:13,094] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [YYYY-MM-DD 17:36:13,119] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:36:13,133] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:36:13,135] INFO Registered broker 2 at path /brokers/ids/2 with addresses: PLAINTEXT -\> EndPoint(yyy.yyy.93.88,9092,PLAINTEXT) (kafka.utils.ZkUtils) [YYYY-MM-DD 17:36:13,149] INFO [Kafka Server 2], started (kafka.server.KafkaServer) [YYYY-MM-DD 17:36:15,082] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test2,1],[test,14],[test2,13],[test,15],[test,3],[test2,11],[test,6],[test5,1],[test2,5],[ThrashMetal,2],[test,17],[test5,0],[test,5],[test,11],[test3,1],[test2,15],[test2,18],[test2,8],[test,16],[test,1],[NUMBERS,0],[test,4],[test,19],[test2,3],[test2,10],[ThrashMetal,0],[test,18],[test,13],[test2,2],[test2,17],[test,12],[test2,7],[test,2],[test,8],[test2,16],[NUMBERS,1],[test2,6],[test,0],[test2,4],[test2,0],[test,7],[test2,9],[test,9],[superTest,1],[test3,0],[test2,14],[ThrashMetal,1],[superTest,0],[test2,19],[test,10],[test2,12] (kafka.server.ReplicaFetcherManager) [YYYY-MM-DD 17:36:15,096] INFO Truncating log NUMBERS-0 to offset 13. (kafka.log.Log) [YYYY-MM-DD 17:36:15,100] INFO Truncating log NUMBERS-1 to offset 14. (kafka.log.Log) [YYYY-MM-DD 17:36:15,162] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([[NUMBERS,1], ...) initOffset 14 to broker BrokerEndPoint(1,yyy.yyy.93.87,9092)] [YYYY-MM-DD 17:36:15,171] INFO [ReplicaFetcherThread-0-1], Starting (kafka.server.ReplicaFetcherThread) [YYYY-MM-DD 17:36:15,354] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,22] (kafka.coordinator.GroupMetadataManager) [YYYY-MM-DD 17:36:15,424] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,22] in 69 milliseconds. (kafka.coordinator.GroupMetadataManager) [YYYY-MM-DD 17:37:14,092] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [NUMBERS,1] (kafka.server.ReplicaFetcherManager)Запуск Corax-брокера на сервере
Server-2выполнен успешно.Выполните запуск узла Corax на сервере
Server-3в режиме демона с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server-3-servers.properties && tail -f logs/server.log Kafka application log tail -f from /home/mon99usr/kafka_2.11-0.9.0.0/logs/server.log [ [YYYY-MM-DD 17:45:56,259] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:45:56,280] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:45:56,281] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(yyy.yyy.97.221,9092,PLAINTEXT) (kafka.utils.ZkUtils) [YYYY-MM-DD 17:45:56,300] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [YYYY-MM-DD 17:50:45,138] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [YYYY-MM-DD 17:50:57,090] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:50:57,096] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:50:57,097] INFO 3 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [YYYY-MM-DD 17:51:00,733] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [YYYY-MM-DD 17:52:37,509] INFO KafkaConfig values: ... (kafka.server.KafkaConfig) [YYYY-MM-DD 17:52:37,675] INFO starting (kafka.server.KafkaServer) [YYYY-MM-DD 17:52:37,686] INFO Connecting to zookeeper on yyy.yyy.93.87:3181,yyy.yyy.93.88:3181,yyy.yyy.97.221:3181 (kafka.server.KafkaServer) [YYYY-MM-DD 17:52:38,083] INFO Loading logs. [YYYY-MM-DD 17:52:38,090] INFO Logs loading complete. [YYYY-MM-DD 17:52:38,091] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:52:38,103] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [YYYY-MM-DD 17:52:38,163] INFO Awaiting socket connections on SBT-IPO-208.youre.adress.ru:9092. (kafka.network.Acceptor) [YYYY-MM-DD 17:52:38,169] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer) [YYYY-MM-DD 17:52:38,199] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:52:38,203] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:52:38,378] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:52:38,394] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 13 milliseconds. (kafka.coordinator.GroupMetadataManager) [YYYY-MM-DD 17:52:38,396] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:52:38,397] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator) [YYYY-MM-DD 17:52:38,407] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [YYYY-MM-DD 17:52:38,432] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:52:38,437] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [YYYY-MM-DD 17:52:38,445] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [YYYY-MM-DD 17:52:38,485] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:52:38,504] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [YYYY-MM-DD 17:52:38,506] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -\> EndPoint(yyy.yyy.97.221,9092,PLAINTEXT) (kafka.utils.ZkUtils) [YYYY-MM-DD 17:52:38,524] INFO [Kafka Server 3], started (kafka.server.KafkaServer)Запуск Corax-брокера на сервере
Server-3выполнен успешно.
Запуск кластеров Corax и Zookeeper выполнен успешно.
Тест доступности Zookeeper / работоспособности Corax при различных условиях работы кластера Zookeeper
Тест работоспособности Corax при всех работающих узлах кластера Zookeeper.
Вывод списка топиков:
bin/kafka-topics.sh ---list --zookeeper yyy.yyy.93.87:3181,yyy.yyy.93.87:3181,yyy.yyy.93.87:3181 ---topic NUMBERS NUMBERSВ списке содержится топик
NUMBERS.Отправка сообщений в топик
NUMBERS:bin/kafka-console-producer.sh --broker-list yyy.yyy.93.87:9092,yyy.yyy.93.88:9092,yyy.yyy.97.221:9092 --topic NUMBERS 1 2 3Запись в топик успешна.
Чтение сообщений из топика:
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server yyy.yyy.93.88:9092 --topic NUMBERS --from-beginning 1 2 3Чтение из топика успешно.
Тест работоспособности Corax при отказе одного узла кластера Zookeeper:
список топиков выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Тест работоспособности Corax при отказе двух узлов кластера Zookeeper:
список топиков не выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Тест работоспособности Corax при полном отказе кластера Zookeeper:
список топиков не выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Конфигурирование Corax#
Конфигурирование узла (брокера) Corax#
Конфигурирование брокера Corax осуществляется в файле
config/server.properties.
Допускается создание файлов конфигурации с различными именами, например:
config/server-1-node-cluster.properties
config/server-3-node-cluster.properties
config/server-3-node-cluster-on-1-machine.properties
config/server-5-node-cluster.properties
При запуске Corax достаточно указать нужный конфигурационный файл.
Минимальная конфигурация#
Пример файла конфигураций server.properties для одного брокера:
broker.id=1
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs
Шаги создания конфигурационного файла:
Присвойте ключу
broker.idнеобходимый числовой идентификатор. Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.Пример
broker.id=1Для ключа
zookeeper.connectукажитеIP:PORTсервиса Zookeeper.Пример
Запущен один экземпляр Zookeeper и его порт
clientPort=2181:zookeeper.connect=localhost:2181Пример
Запущен кластер из нескольких экземпляров Zookeeper (адреса всех служб Zookeeper указаны через запятую):
zookeeper.connect=xxx.xxx.1.1:2181,xxx.xxx.1.2:2181Для ключа
log.dirsзадайте одну или несколько директорий, в которых планируется хранение сообщений, передаваемых в Corax.Пример
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logsПримечание
Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.
Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logsДля ключа
listenersприсвойте строку формата<ПРОТОКОЛ>://<хост>:<порт>:Пример
listeners=PLAINTEXT://localhost:9092 listeners=SSL://yyy.yyy.93.88:9092 listeners=SASL_PLAINTEXT://:9092
Расширенная конфигурация#
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Наиболее важные из них приведены далее.
Если на одном физическом сервере планируется запускать несколько брокеров Corax, то для каждого брокера необходимо также указать уникальный порт, заданный в ключе
port. По умолчанию сервер Corax слушает порт9092.Можно изменить значение количества партиций для вновь создаваемых топиков по умолчанию в ключе
num.partitions. Если для сервера Corax доступны несколько жестких дисков, увеличение числа партиций увеличит производительность Corax. Количество партиций по умолчанию можно указать равным числу дисков.Пример:
num.partitions=4Для повышения надежности необходимо раскомментировать записи, содержащие ключи
log.flush.interval.messagesиlog.flush.interval.ms. В этом случае указывается, как часто должна происходить реальная запись сообщений на диск: либо по истечении заданного времени, либо после достижения заданного количества незаписанных сообщений. Данную настройку можно изменить для конкретного топика при его создании.Пример:
log.flush.interval.messages=10000 log.flush.interval.ms=1000Для ключа
log.retention.hoursзадается время хранения партиций в часах, по истечении которого партиция будет доступна для удаления. В данном случае отсчет будет вестись по самому новому сообщению в партиции.Пример
log.retention.hours=168Для ключа
log.cleaner.enableзадается значениеtrueдля активации механизма удаления старых партиций.Пример
log.cleaner.enable=true
Запуск узлов кластера Corax#
Выполнив вышеуказанные настройки, можно запускать узлы кластера Corax.
Отдельные узлы можно запускать в любое время, т. е. запуск отдельного узла сервера Corax не зависит от остальных, как это было с кластером Zookeeper.
Запуск каждого отдельного сервера Corax выполняется командой:
bin/kafka-server-start.sh config/server.properties
При запуске всех узлов кластера Corax в консолях запуска должны быть следующие записи:
INFO [Kafka Server ID], started (kafka.server.KafkaServer)
Где ID — идентификатор сервера Corax, указанный в ключе broker.id.
Сетевые настройки#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
message.max.bytes |
Максимальный размер сообщений, принимаемых сервером. Пример: message.max.bytes=1000012 |
int |
1000012 |
num.network.threads |
Количество сетевых потоков, используемых сервером для обработки сетевых запросов. Пример: num.network.threads=3 |
int |
3 |
queued.max.requests |
Лимит запросов. Если лимит превышен, начнется блокировка сетевых запросов. Пример: message.max.bytes=500 |
int |
500 |
replica.socket.receive.buffer.bytes |
Размер (в байтах) буфера сокета для приема сетевых запросов. Пример: replica.socket.receive.buffer.bytes=65536 |
int |
65536 |
replica.socket.timeout.ms |
Тайм-аут (в миллисекундах) сокета для сетевых запросов. Пример: replica.socket.timeout.ms=102400 |
int |
30000 |
request.timeout.ms |
Максимальное количество времени, которое уходит на ожидание клиентом ответа на отправленный запрос. Если в течение этого времени ответ на запрос не получен, клиент отправит повторный запрос. Если лимит повторных запросов превышен, то запросы будут прекращены. Пример: request.timeout.ms=102400 |
int |
30000 |
socket.receive.buffer.bytes |
Размер буфера в байтах на прием данных через сокет. Пример: socket.receive.buffer.bytes=102400 |
int |
102400 |
socket.request.max. bytes |
Максимальное количество байтов на сокет-запрос. Пример: socket.request.max.bytes=104857600 |
int |
104857600 |
socket.send.buffer.bytes |
Размер буфера в байтах на передачу данных через сокет. Пример: socket.send.buffer.bytes =102400 |
int |
102400 |
connections.max.idle.ms |
Тайм-аут неудачных попыток соединения в миллисекундах. Пример: connections.max.idle.ms=60000 |
long |
60000 |
controller.socket.timeout.ms |
Тайм-аут сокета в миллисекундах для Controller-to-Broker каналов. Пример: controller.socket.timeout.ms=30000 |
int |
30000 |
max.connections.per.ip |
Максимальное количество соединений с каждого IP-адреса. Пример: max.connections.per.ip=2147483647 |
int |
2147483647 |
max.connections.per.ip.overrides |
Настройка, позволяющая переопределять заданное максимальное количество соединений max.connections.per.ip, варьируя его для каждого IP-адреса. Пример: max.connections.per.ip.overrides=yyy.yyy.93.87:5,yyy.yyy.93.88:,yyy.yyy.97.221:45 |
string |
"" |
Настройки подключения к Zookeeper#
Запуск Corax невозможен без подключения к Zookeeper.
Для подключения Corax-брокеров к кластеру Zookeeper в файле конфигураций server.properties каждого Corax-брокера в свойство zookeeper.connect необходимо прописать список всех хостов Zookeeper-брокеров в формате IP:PORT, разделяя их запятыми. Где:
IP— адрес хоста, на котором работает Zookeeper-брокер;PORT— порт (clientPortв файлеzookeeper.properties) для подключения клиентов Zookeeper — Corax-брокеров.
Для всех остальных настроек выставятся значения по умолчанию, если они не будут указаны явно в файле конфигураций. Список всех известных настроек Zookeeper, относящихся к файлу конфигураций server.properties Corax-брокера, и их значений по умолчанию приведен в таблице ниже.
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
zookeeper.connect |
Список адресов Zookeeper-брокеров в формате |
string |
"" |
zookeeper.connection.timeout.ms |
Максимальное время ожидания клиентом соединения с Zookeeper. Если это значение не выставлено, то используется zookeeper.session.timeout.ms. Пример: zookeeper.connection.timeout.ms=6000 |
int |
null |
zookeeper.session.timeout.ms |
Тайм-аут сессии Zookeeper в миллисекундах. Пример: zookeeper.session.timeout.ms=6000 |
int |
6000 |
zookeeper.set.acl |
Принудить клиента использовать безопасные ACL (Access Control Lists — списки контроля доступа). Пример: zookeeper.set.acl=false |
boolean |
false |
zookeeper.sync.time.ms |
Продолжительность времени в миллисекундах, через которое происходит синхронизация Zookeeper-последователя с Zookeeper-лидером. Пример: zookeeper.sync.time.ms=2000 |
int |
2000 |
Настройки диска#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
log.dir |
Одна директория, в которой хранятся данные Corax — логи сообщений. В случае если в файле конфигураций не был указан путь хранения данных, Corax разместит логи в /tmp/kafka-logs. Пример: log.dir=/tmp/kafka-logs |
string |
/tmp/kafka-logs |
log.dirs |
Несколько директорий, в которых хранятся данные Corax — логи сообщений. Это свойство является дополнением к свойству log.dirs. Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории. Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую. Пример: log.dirs=/var/log/kafka/kafka-logs,/another\disk/kafka/kafka-logs |
string |
null |
log.flush.interval.messages |
Интервал, выражаемый количеством сообщений, накапливающихся в логе партиции до момента сохранения этого лога на диск. Если в логе накопилось сообщений меньше, чем задано в этом свойстве, то в случае сбоя (например, перезапуска ОС) эти сообщения будут потеряны на данной машине, поскольку хранились в оперативной памяти и не были сохранены на диск. Пример: log.flush.interval.messages=10000 |
long |
9223372036854775807 |
log.flush.interval.ms |
Максимальное время (в миллисекундах) хранения сообщения любого топика в оперативной памяти до того, как оно будет сохранено на диск. Если свойство не задано, будет использоваться значение из другого свойства — log.flush.scheduler.interval.ms. Пример: log.flush.interval.ms=1000 |
long |
null |
log.flush.offset.checkpoint.interval.ms |
Частота, с которой происходит обновление персистентной записи, сохраненной в ходе последней сессии записи на диск. Эта частота служит отправной точкой восстановления лога. Пример: log.flush.offset.checkpoint.interval.ms=10 |
int |
60000 |
log.flush.scheduler.interval.ms |
Частота в миллисекундах, с которой данные каждого лога сохраняются на диск. Пример: log.flush.scheduler.interval.ms=10 |
long |
9223372036854775807 |
log.retention.bytes |
Максимальный размер лога перед его удалением. Пример: log.retention.bytes =100 |
long |
-1 |
log.retention.hours |
Время (в часах) хранения лог-файла до его удаления. Пример: log.retention.hours=168 |
int |
168 |
log.retention.minutes |
Время (в минутах) хранения лог-файла до его удаления. Если это свойство не задано, будет использоваться значение из log.retention.hours. Пример: log.retention.minutes=100000 |
int |
null |
log.retention.ms |
Время (в миллисекундах) хранения лог-файла до его удаления (третье по важности после свойства log.retention.minutes). Если это свойство не задано, будет использоваться значение из log.retention.minutes. Пример: log.retention.ms=6000 |
long |
null |
log.roll.hours |
Циркуляция логов (в часах) — максимальное количество времени до записи нового сегмента лога. Это свойство имеет более низкий приоритет, чем аналогичное свойство log.roll.ms. Пример: log.roll.hours=10 |
int |
168 |
log.roll.jitter.hours |
Максимальное случайное отклонение (для циркуляции логов) (в часах) для вычитания из значения logRollTimeMillis (в миллисекундах). Вторично по отношению к свойству log.roll.jitter.ms. Пример: log.roll.jitter.hours=1 |
int |
0 |
log.roll.ms |
Циркуляция логов (в миллисекундах) — максимальное количество времени до записи нового сегмента лога. Если свойство не задано, используется значение из log.roll.hours. Пример: log.roll.ms=10 |
long |
null |
log.segment.bytes |
Максимальный размер одного лог-файла. Пример: log.segment.bytes=250 |
int |
1073741824 |
log.segment.delete.delay.ms |
Количество времени (в миллисекундах) до удаления файла из файловой системы. Пример: log.roll.delete.delay.ms=10 |
long |
60000 |
log.cleaner.backoff.ms |
Количество времени на простаивание до появления логов для удаления. Пример: log.cleaner.backoff.ms=10 |
long |
15000 |
log.cleaner.dedupe.buffer.size |
Общий размер памяти, выделяемой для удаления дубликатов во всех потоках службы «Log Cleaner». Пример: log.cleaner.dedupe.buffer.size=10 |
long |
134217728 |
log.cleaner.delete.retention.ms |
Количество времени сохранения удаленных записей. Пример: log.cleaner.delete.retention.ms=10 |
long |
86400000 |
log.cleaner.enable |
Включение службы «Log Cleaner». Должно иметь значение true, если есть какой-либо топик со свойством cleanup.policy=compact, включая топик с внутренними смещениями. Если значение установлено в false, то эти топики не будут компактными и будут постоянно увеличиваться в размере. Пример: log.cleaner.enable=true |
boolean |
true |
log.cleaner.io.buffer.load.factor |
Коэффициент загрузки буфера при удалении дубликатов процессом «Log Cleaner». Например, при коэффициенте, равном 0.9, степень загрузки буфера — 90%. Чем больше значение, тем большее количество логов будет удаляться за один раз, однако вместе с этим будет возрастать и число хеш-коллизий. Пример: log.cleaner.io.buffer.load.factor=0.9 |
double |
0.9 |
log.cleaner.io.buffer.size |
Общий объем памяти, используемый службой «Log Cleaner» на операции ввода/вывода для всех потоков этой службы. Пример: log.cleaner.io.buffer.size=255 |
int |
524288 |
log.cleaner.io.max.bytes.per.second |
Ограничение операций ввода/вывода службы «Log Cleaner» так, чтобы сумма байтов при операциях ввода и вывода была всегда меньше значения, указанного в этом свойстве. Пример: log.cleaner.io.max.bytes.per.second=5.0 |
double |
1.7976931348623157E308 |
log.cleaner.min.cleanable.ratio |
Минимальное соотношение между «грязным» и общим логом для определения пригодности лога для удаления. Пример: log.cleaner.min.cleanable.ratio=0.5 |
double |
0.5 |
log.cleaner.threads |
Число фоновых потоков, используемых для чистки логов службой «Log Cleaner». Пример: log.cleaner.threads=1 |
int |
1 |
log.cleanup.policy |
Политика очистки сегментов логов, находящихся за пределами сохраняющего окна «retention window». Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: log.cleanup.policy=delete |
string |
delete |
log.index.interval.bytes |
Интервал (в байтах), через который прибавляется запись к индексу смещения (offset). Пример: log.index.interval.bytes=4096 |
int |
4096 |
log.index.interval.bytes |
Интервал (в байтах), через который прибавляется запись к индексу смещения (offset). Пример: log.index.interval.bytes=4096 |
int |
4096 |
log.index.size.max.bytes |
Максимальный размер индекса смещения (offset) в байтах. Пример: log.index.size.max.bytes=10485760 |
int |
10485760 |
log.preallocate |
Требуется ли создавать файл при выделении нового сегмента. Для Kafka на Windows рекомендуется установить значение true. Пример: log.preallocate=false |
boolean |
false |
log.retention.check.interval.ms |
Частота в миллисекундах, с которой служба «Log Cleaner» проверяет каждый лог на необходимость его удаления. Пример: log.retention.check.interval.ms=300000 |
long |
300000 |
Настройки топиков#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
cleanup.policy |
Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: cleanup.policy=delete |
log.cleanup.policy |
delete |
delete.retention.ms |
log.cleaner.delete.retention.ms |
86400000 (24 hours) |
|
flush.messages |
log.flush.interval.messages |
None |
|
flush.ms |
log.flush.interval.messages |
None |
|
index.interval.bytes |
log.flush.interval.ms |
4096 |
|
max.message.bytes |
message.max.bytes |
1000000 |
|
min.cleanable.dirty.ratio |
log.cleaner.min.cleanable.ratio |
0.5 |
|
min.insync.replicas |
min.insync.replicas |
1 |
|
retention.bytes |
log.retention.bytes |
None |
|
retention.ms |
Максимальное время (в миллисекундах) хранения логов до того, как старые сегменты лога будут удалены для освобождения дискового пространства. Свойство работает только в том случае, если cleanup.policy=compact |
log.retention.ms |
7 days |
segment.bytes |
Размер лог-файла в байтах. Сохраняется и удаляется всегда по одному файлу за один раз |
log.segment.bytes |
1 ГБ |
segment.index.bytes |
Размер индекса, который отображает смещения (offset) на позиции в файлах |
log.index.size.max.bytes |
10 МБ |
segment.ms |
log.roll.hours |
7 days |
|
segment.jitter.ms |
log.roll.jitter.{ms,hours} |
0 |
Конфигурирование кластера Corax#
Конфигурирование кластера серверов Corax, как и конфигурирование одного узла, осуществляется в файле server.properties на каждом сервере, который планируется включить в кластер.
Минимальная конфигурация кластера#
Пример файла конфигураций server.properties для кластера Corax из двух брокеров (приведен пример конфигурации для одного из брокеров):
broker.id=1
host.name=xxx.xxx.1.1
zookeeper.connect=xxx.xxx.1.1:3181,xxx.xxx.1.2:3181
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs
Шаги создания конфигурационного файла:
Присвойте ключу
broker.idнеобходимый числовой идентификатор.Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.
Пример
На первом сервере Corax:
broker.id=1На втором сервере:
broker.id=2Присвойте ключу
host.nameIP-адрес сервера.Пример
host.name=xxx.xxx.1.1Если на одном физическом сервере планируется запускать несколько серверов Corax, то для каждого сервера необходимо также указать уникальный порт, заданный в ключе
port. По умолчанию сервер Corax слушает порт 9092.Для ключа
zookeeper.connectукажите IP и PORT сервиса Zookeeper в форматеzookeeper.connect=IP:PORT. Если кластер Zookeeper запущен, то укажите адреса Zookeeper-сервисов через запятую:Пример
zookeeper.connect=xxx.xxx.1.1:2181,xxx.xxx.1.2:2181Для ключа
log.dirsзадайте директорию, в которой планируется хранение сообщений, передаваемых в Corax.Пример
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logsПримечание
Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.
Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs
На данном этапе минимальная конфигурация узлов кластера Corax завершается. Если не существует каких-либо ошибок в конфигурации, а также Zookeeper сконфигурирован корректно и запущен, то этого вполне достаточно для успешного запуска и работы кластера Corax.
Расширенная конфигурация кластера#
Конфигурирование производителя (producer)#
Минимальная конфигурация
Пример файла конфигураций producer.properties для одного брокера:
acks=1
bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102,
`hostname --f`:9103
Шаги создания конфигурационного файла:
Присвойте ключу
acksзначение, указывающее, что сообщение отправлено:0 — сообщение расположено в TCP-буфере для отправки по сети;
1 — сообщение успешно записано в партицию-лидер;
1 (all) — сообщение успешно записано на все
ISR(InSyncReplicasпараметр на Kafka broker(server.properties))
Пример
acks=1Для ключа
bootstrap.serversукажитеIP:PORTсервиса Corax.Пример
Если запущены 3 экземпляра Corax на том же сервере:
bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103Допускается также указывать IP-адреса, полные доменные имена.
Расширенная конфигурация
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.
Запуск каждого отдельного производителя Corax выполняется командой:
./bin/kafka-console-producer --broker-list `hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103 --topic test1 --producer.config etc/kafka/producer.properties
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
key.serializer |
Класс сериализатора для ключа, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
value.serializer |
Класс сериализатора для значения, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
acks |
Количество подтверждений, которые производитель должен получить от лидера, прежде чем считать запрос завершенным. Это определяет долговечность записей, которые будут отправлены. Разрешены следующие настройки: |
string |
1 |
bootstrap.servers |
Список пар «хост: порт», используемых для установления начального соединения с кластером Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде: host1:port1, host2:port2, … Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (можно указать более одного, на случай, если указанный сервер не работает) |
list |
"" |
buffer.memory |
Общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, производитель блокируется на время, заданное параметром max.block.ms, после чего он выдаст исключение. Этот параметр должен примерно соответствовать общей памяти, которую будет использовать производитель, но не является жесткой привязкой, поскольку не вся память, используемая производителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержания inflight-запросов |
long |
33554432 |
compression.type |
Тип сжатия для всех данных, генерируемых производителем. Допустимые значения: none (без сжатия), gzip, snappy, lz4 или zstd. Сжатие полных партий данных, поэтому эффективность разбивки на порции данных также повлияет на степень сжатия (большее дозирование означает лучшее сжатие) |
string |
none |
retries |
Установка значения больше нуля будет означать для клиента переотправку любой записи, отправка которой завершилась предположительно временной ошибкой. Эта попытка не отличается от случая, когда клиент получает ошибку. Разрешение повторов (retries) без установки параметра max.in.flight.requests.per.connection=1 потенциально изменит порядок записей, т. к. если два пакета отправляются в один раздел и первый пакет вызывает сбой и повторяется, но второй успешно записан, то запись о втором пакете может появиться первой. Запросы на запись будут завершены ошибкой до того, как количество попыток будет исчерпано, а если тайм-аут настроен, delivery.timeout.msexpires будет достигнут до успешного подтверждения. Пользователи обычно предпочитают не указывать этот параметр, а использовать delivery.timeout.msexpires для управления поведением повторной попытки |
int |
2,15E+09 |
ssl.key.password |
Пароль закрытого ключа в файле keystore. Это необязательно для клиента |
password |
null |
ssl.keystore.location |
Расположение закрытого ключа в файле keystore. Это необязательно для клиента |
string |
null |
ssl.keystore.password |
Пароль хранилища для файла хранилища ключей (keystore). Это необязательно для клиента и требуется, если используется параметр ssl.keystore.location |
password |
null |
ssl.truststore.location |
Расположение файла хранилища доверительных сертификатов (truststore) |
string |
null |
ssl.truststore.password |
Пароль для файла хранилища доверительных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, но проверка целостности отключена |
password |
null |
batch.size |
При отправке нескольких записей в один раздел производитель попытается объединить записи в меньшее количество запросов. Это повышает производительность как на клиенте, так и на сервере. Этот параметр управляет размером пакета по умолчанию в байтах. Не будет предпринята попытка объединения записей в пакеты большего размера. Запросы, отправленные брокерам, будут содержать несколько пакетов, по одному для каждой партиции с доступными для отправки данными. Небольшой размер пакета сделает дозирование менее распространенным и может уменьшить пропускную способность (нулевой размер пакета полностью отключит дозирование). Очень большой размер пакета может использовать память немного более расточительно, так как всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей |
int |
16384 |
client.dns.lookup |
Параметр управляет тем, как клиент использует DNS lookup. Если установлено значение use\all\dns\ips, то, когда поиск возвращает несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверам. Если установлено значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена и расширена в список канонических имен |
string |
default |
client.id |
Строка id для передачи серверу при выполнении запросов. Позволяет отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера |
string |
"" |
connections.max.idle.ms |
Закрытие неактивных соединений после количества миллисекунд, указанных для этого параметра |
long |
540000 |
delivery.timeout.ms |
Верхняя граница времени для отчета об успехе или сбое после получения ответа на вызов send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (acks) (если ожидается) и время, разрешенное для повторных отправок при сбоях. Producer может сообщить об ошибке отправки записи раньше, чем это обозначено в конфигурации, если обнаружена неустранимая ошибка, количество повторных попыток отправки было исчерпано или запись добавлена в пакет, который достиг окончания срока доставки (deadline) раньше. Значение этой конфигурации должно быть больше или равно сумме request.timeout.ms + linger.ms |
int |
120000 |
linger.ms |
Производитель группирует все записи, поступающие между запросами передачи, в один пакетированный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако в некоторых случаях клиенту может понадобиться уменьшить количество запросов даже при умеренной нагрузке. Рассматриваемая настройка позволяет это сделать, добавляя небольшое количество искусственной задержки, то есть вместо немедленной отправки записи производитель будет ждать до заданной задержки, чтобы разрешить отправку других записей и отправить их вместе. Это можно рассматривать как аналог алгоритма Нейгла в TCP. Параметр определяет верхнюю границу задержки для дозирования: как только получен batch.size записей для партиции, они будут отправлены немедленно независимо от этого параметра, однако если количество байтов меньше, чем накопленных для этого раздела, будет выполняться задержка в течение указанного времени, ожидая появления большего количества записей. Этот параметр по умолчанию равен 0 (т. е. без задержки). Установка linger.ms=5, например, будет иметь эффект уменьшения количества отправленных запросов, но добавит до 5 мс задержки к записям, отправленным в отсутствие нагрузки |
int |
0 |
max.block.ms |
Параметр определяет, как долго KafkaProducer.send() и KafkaProducer.partitionsFor() будет блокировать. Эти методы могут быть заблокированы, так как буфер заполнен или метаданные недоступны. Блокировка в пользовательских сериализаторах или разделителе не будет учитываться в течение этого времени ожидания |
long |
60000 |
max.request.size |
Максимальный размер запроса в байтах. Этот параметр ограничивает количество партий записей, которые производитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. Это также эффективно ограничивает максимальный размер пакета записи. Обратите внимание, что сервер имеет свой собственный размер пакета записи, который может отличаться от заданного в данном параметре |
int |
1048576 |
partitioner.class |
Класс разделителя, реализующий org.apache.kafka.clients.producer.Partitioner-интерфейс |
class |
org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes |
Размер буфера приема TCP (SO_RCVBUF) для использования при чтении данных. Если значение равно -1, будет использоваться ОС по умолчанию |
int |
32768 |
request.timeout.ms |
Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны. Значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера) для уменьшения возможности дублирования сообщений из-за ненужных попыток производителя |
int |
30000 |
sasl.client.callback.handler.class |
Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler |
class |
null |
sasl.jaas.config |
Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: |
password |
null |
sasl.kerberos.service.name |
Имя участника (принципала) Kerberos, под которым работает Kafka. Это можно определить либо в конфигурации JAAS Kafka, либо в конфигурации Kafka |
string |
null |
sasl.login.callback.handler.class |
Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс AuthenticateCallbackHandler. Для брокеров конфигурация обработчика обратного вызова входа (login callback handler) должна иметь префикс прослушивателя (listener) и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class |
null |
sasl.login.class |
Полное имя класса, который определяет интерфейс входа. Для брокеров конфигурация входа должна быть с префиксом listener'а и именем механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin |
class |
null |
sasl.mechanism |
Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого доступен поставщик безопасности. GSSAPI является механизмом по умолчанию |
string |
GSSAPI |
security.protocol |
Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL |
string |
PLAINTEXT |
send.buffer.bytes |
Размер буфера отправки TCP (SO_SNDBUF) для использования при отправке данных. Если значение равно -1, будет использоваться ОС по умолчанию |
int |
131072 |
ssl.enabled.protocols |
Список протоколов, разрешенных для SSL-соединений |
list |
TLSv1.2,TLSv1.1,TLSv1 |
ssl.keystore.type |
Формат файла хранилища ключей (key store). Параметр необязателен для клиента |
string |
JKS |
ssl.protocol |
Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимые значения: TLSv1.1 и TLSv1.2. SSL, SSLv2 и SSLv3 могут поддерживаться в старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности |
string |
TLS |
ssl.provider |
Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию — значение, установленное по умолчанию в JVM |
string |
null |
ssl.truststore.type |
Формат файла хранилища доверия (trust store) |
string |
JKS |
enable.idempotence |
Если задано значение true, производитель гарантирует, что в поток записывается ровно одна копия каждого сообщения. Если false, производитель повторяет попытку из-за сбоев брокера и т. д., может писать дубликаты повторенного сообщения в потоке. Обратите внимание, что для включения идемпотенции требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество попыток переотправки (retries) было больше 0, а ack=all. Если эти значения явно не заданы пользователем, будут выбраны подходящие значения. Если установлены несовместимые значения, будет создано исключение ConfigException |
boolean |
false |
interceptor.classes |
Список классов для использования в качестве перехватчиков. Реализовывает org.apache.kafka.clients.producer.ProducerInterceptor-интерфейс, позволяющий перехватывать (и, возможно, изменять) записи, полученные производителем до их публикации в кластере Kafka. По умолчанию перехватчиков нет |
list |
"" |
max.in.flight.requests.per.connection |
Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если этот параметр больше 1 и есть неудачные отправки, существует риск переупорядочения сообщений из-за повторных попыток (т. е. если повторные попытки включены) |
int |
5 |
metadata.max.age.ms |
Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений в руководстве разделов, чтобы проактивно обнаружить новые брокеры или разделы |
long |
300000 |
metric.reporters |
Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporter интерфейса позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX |
list |
"" |
metrics.num.samples |
Количество выборок, сохраняемых для вычисления метрик |
int |
2 |
metrics.recording.level |
Самый высокий уровень записи для метрик |
string |
INFO |
metrics.sample.window.ms |
Окно времени, в котором вычисляется выборка метрик |
long |
30000 |
reconnect.backoff.max.ms |
Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного дрожания, чтобы избежать штормов соединения |
long |
1000 |
reconnect.backoff.ms |
Базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру |
long |
50 |
retry.backoff.ms |
Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя |
long |
100 |
sasl.kerberos.kinit.cmd |
Путь к команде Kerberos kinit |
string |
/usr/bin/kinit |
sasl.kerberos.min.time.before.relogin |
Время сна потока входа между попытками обновления |
long |
60000 |
sasl.kerberos.ticket.renew.jitter |
Процент случайного jitter, добавленного к времени обновления |
double |
0.05 |
sasl.kerberos.ticket.renew.window.factor |
Поток входа будет спать до тех пор, пока не будет достигнут указанный коэффициент времени окна от последнего обновления до истечения срока действия билета, после чего он попытается обновить билет |
double |
0.8 |
sasl.login.refresh.buffer.seconds |
Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
300 |
sasl.login.refresh.min.period.seconds |
Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
60 |
sasl.login.refresh.window.factor |
Поток обновления входа будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%). В настоящее время применяется только к носителю OAUTHBEARER |
double |
0.8 |
sasl.login.refresh.window.jitter |
Максимальное количество случайного дрожания относительно времени жизни учетных данных, добавленного ко времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно. Значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER |
double |
0.05 |
ssl.cipher.suites |
Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров |
list |
null |
ssl.endpoint.identification.algorithm |
Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера |
string |
https |
ssl.keymanager.algorithm |
Алгоритм, используемый фабрикой key manager для SSL-соединений. Значение по умолчанию — это алгоритм фабрики Key manager, настроенный для виртуальной машины Java |
string |
SunX509 |
ssl.secure.random.implementation |
Реализация SecureRandom PRNG, используемая для операций шифрования SSL |
string |
null |
ssl.trustmanager.algorithm |
Алгоритм, используемый фабрикой trust manager для SSL-соединений. Значение по умолчанию — это заводской алгоритм trust manager, настроенный для виртуальной машины Java |
string |
PKIX |
transaction.timeout.ms |
Максимальное время в миллисекундах, в течение которого координатор транзакций будет ждать обновления статуса транзакции от производителя, прежде чем проактивно прервать текущую транзакцию. Если это значение больше, чем transaction.max.timeout.ms в брокере, запрос завершится ошибкой InvalidTransactionTimeout |
int |
60000 |
transactional.id |
Идентификатор транзакции, используемый для доставки транзакций. Это включает семантику надежности, которая охватывает несколько сеансов производителя, так как это позволяет клиенту гарантировать, что транзакции с использованием того же TransactionalId были завершены до начала любых новых транзакций. Если идентификатор транзакции не указан, производитель ограничивается идемпотентной доставкой. Обратите внимание, что enable.idempotencemust должна быть включена, если настроен идентификатор транзакции. Значение по умолчанию равно null, что означает невозможность использования транзакций. По умолчанию для транзакций требуется кластер по крайней мере из трех брокеров, что является рекомендуемым параметром для производства; для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor |
string |
null |
Конфигурирование потребителя (consumer)#
Минимальная конфигурация#
Пример файла конфигураций consumer.properties для одного брокера:
group.id=test-consumer-group
bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102,
`hostname --f`:9103
Шаги создания конфигурационного файла:
Присвойте ключу
group.idнеобходимый идентификатор группы, под которым будет запущен consumer. Можно запускать несколько consumers под одним и тем жеgroup.id, что позволит считывать данные в несколько потоков (с нескольких серверов), обеспечивая отсутствие дублирования вычитываемых данных.Пример
group.id=test-consumer-groupДля ключа
bootstrap.serversукажитеIP:PORTсервиса Corax.Пример
Если запущены 3 экземпляра Corax на том же сервере:
bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103Допускается также указывать IP-адреса, полные доменные имена.
Расширенная конфигурация#
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.
Запуск каждого отдельного потребителя Corax выполняется командой:
./bin/kafka-console-consumer --broker-list `hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103 --topic test1 --consumer.config etc/kafka/consumer.properties --from-beginning
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
key.deserializer |
Класс десериализатора для ключа, реализующего org.apache.kafka.common.serialization.Deserializerinterface |
class |
|
value.deserializer |
Класс десериализатора для значения, реализующего org.apache.kafka.common.serializationDeserializerinterface |
class |
|
bootstrap.servers |
Список пар «хост: порт», используемых для установления начального соединения с кластером Platform V Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде host1: port1, host2:port2,… Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (вы можете указать более одного, на случай, если указанный сервер не работает) |
list |
"" |
fetch.min.bytes |
Минимальный объем данных, который сервер должен вернуть для запроса на выборку (fetch request). Если данных недостаточно, запрос будет ждать, пока накопится необходимое количество данных, прежде чем ответить на запрос. Значение по умолчанию 1 байт означает, что запросы на выборку отвечают, как только доступен один байт данных или время ожидания запроса на выборку данных заканчивается. Установка этого значения больше 1 заставит сервер ждать накопления больших объемов данных, что может немного повысить пропускную способность сервера за счет некоторой дополнительной задержки |
int |
1 |
group.id |
Уникальная строка, идентифицирующая группу потребителей, к которой принадлежит данный потребитель. Это свойство является обязательным, если потребитель использует функции управления группами с помощью subscribe (topic) или стратегии управления смещением на основе Corax |
string |
null |
heartbeat.interval.ms |
Ожидаемое время между heartbeats (передачей сигнала «я жив») координатору потребителя при использовании средств управления группы Kafka. Передача сигнала используется для обеспечения активного сеанса потребителя и облегчения перебалансировки, когда новые потребители присоединяются или покидают группу. Значение должно быть установлено ниже session.timeout.ms, однако обычно должно устанавливается не выше 1/3 от session.timeout.ms. Его можно уменьшать с целью большего контроля предполагаемого времени для нормальных перебалансировок |
int |
3000 |
max.partition.fetch.bytes |
Максимальный объем данных на партицию, который будет возвращен сервером. Записи извлекаются потребителем пакетами (batch). Если первый пакет записей в первом непустом разделе выборки больше этого предела, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может добиться успеха по извлечению данных. Максимальный размер пакета записей, принятый брокером, определяется посредством |
int |
1048576 |
session.timeout.ms |
Тайм-аут, используемый для обнаружения сбоев потребителей при использовании средства управления группами Corax. Потребитель посылает периодические heartbeats, чтобы сообщить брокеру, что он жив (не завис). Если до истечения этого тайм-аута брокер не получит heartbeat, то брокер удалит этого потребителя из группы и инициирует перебалансировку. Значение должно находиться в допустимом диапазоне, настроенном в конфигурации брокера group.min.session.timeout.ms и group.max.session.timeout.ms |
int |
10000 |
ssl.key.password |
Пароль закрытого ключа в файле keystore. Необязательный параметр для клиента |
password |
null |
ssl.keystore.location |
Расположение файла хранилища ключей keystore. Необязательный параметр для клиента и может использоваться для двусторонней проверки подлинности (authentication) |
string |
null |
ssl.keystore.password |
Пароль хранилища для файла хранилища ключей keystore. Необязательный параметр для клиента и требуется только в случае, если объявлено ssl.keystore.location |
password |
null |
ssl.truststore.location |
Расположение файла хранилища доверенных сертификатов (truststore) |
string |
null |
ssl.truststore.password |
Пароль для файла хранилища доверенных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, однако проверка целостности отключена |
password |
null |
auto.offset.reset |
Определяет действие, выполняемое, если в Corax нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены): |
string |
latest |
client.dns.lookup |
Управляет тем, как клиент использует DNS-запросы. |
string |
default |
connections.max.idle.ms |
Закрытие незанятых соединений после указанного количества миллисекунд |
long |
540000 |
default.api.timeout.ms |
Задает время ожидания (в миллисекундах) для API-интерфейсов потребителя, которые могут блокировать. Эта конфигурация используется в качестве тайм-аута по умолчанию для всех операций потребителя, которые явно не принимают timeoutparameter |
int |
60000 |
enable.auto.commit |
Если установлено значение true, смещение потребителя будет периодически фиксироваться в фоновом режиме |
boolean |
true |
exclude.internal.topics |
Следует ли предоставлять потребителю записи из внутренних (системных) топиков (например, топик смещений). Если установлено значение true, единственным способом получения записей из внутренних (системных) топиков является подписка на них |
boolean |
true |
fetch.max.bytes |
Максимальный объем данных, который сервер должен вернуть в ответ на запрос по выборке. Записи извлекаются потребителем пакетами, и если первый пакет записей в первом непустом разделе выборки больше этого значения, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель работоспособен и может извлекать данные (есть все разрешения и д.р.). Таким образом, это не абсолютный максимум. Максимальный размер пакета записей, принятый брокером, определяется посредством message.max.bytes (настройки брокеров) или max.message.bytes (настройки топиков). Обратите внимание, что потребитель выполняет несколько выборок параллельно |
int |
52428800 |
isolation.level |
Управляет чтением сообщений, написанных транзакционно. Если установлено значение read\committed, consumer.poll () будет возвращать только транзакционные сообщения, которые были зафиксированы. Если установлено значение read\uncommitted' (по умолчанию), consumer.poll () вернет все сообщения, даже транзакционные сообщения, которые были прерваны. Нетранзакционные сообщения будут возвращены безоговорочно в любом режиме. Сообщения всегда возвращаются в порядке смещения. Следовательно, в режиме read\committed consumer.poll () будет возвращать сообщения только до последнего стабильного смещения (LSO — LastStableOffset), которое меньше смещения первой открытой транзакции. В частности, любые сообщения, появляющиеся после сообщений, относящихся к текущим транзакциям, будут удерживаться до завершения соответствующей операции. В результате read\committed-потребители не смогут читать данные до hight watermark (высокого водяного знака), когда есть текущая транзакция (inflight). Кроме того, если установлено значение read\committed, метод seekToEnd вернет LSO. |
string |
read\uncommitted |
max.poll.interval.ms |
Максимальная задержка между вызовами poll () при использовании управления группами потребителей. Это накладывает ограничение на количество времени, которое потребитель может простаивать до получения других записей. Если poll () не вызывается до истечения этого тайм-аута, то потребитель считается неработоспособным и группа будет перебалансироваться, чтобы переназначить разделы другому члену |
int |
300000 |
max.poll.records |
Максимальное |
int |
500 |
partition.assignment.strategy |
Имя класса стратегии назначения разделов, которую клиент будет использовать для распределения партиций между экземплярами-потребителями при использовании управления группами |
list |
class org.apache.kafka.clients.consumer.RangeAssignor |
receive.buffer.bytes |
Размер буфера приема TCP (SO\RCVBUF), используемый при чтении данных. Если значение равно -1, по умолчанию будет использоваться размера буфера, установленный в ОС |
int |
65536 |
request.timeout.ms |
Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны |
int |
30000 |
sasl.client.callback.handler.class |
Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler |
class |
null |
sasl.jaas.config |
Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: 'loginModuleClass controlFlag (optionName=optionValue)*;'. Для брокеров конфигурация должна иметь префикс слушателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required |
password |
null |
sasl.kerberos.service.name |
Имя принципала Kerberos, под которым работает Rfarf. Это можно определить либо в конфигурации JAAS Corax, либо в конфигурации Kafka |
string |
null |
sasl.login.callback.handler.class |
Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс обработчика AuthenticateCallbackHandler. Для брокеров конфигурация logincallbackhandler должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class |
null |
sasl.login.class |
Полное имя класса, реализующего интерфейс входа в систему. Для брокеров конфигурация входа должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin |
class |
null |
sasl.mechanism |
Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого поставщик безопасности доступен. GSSAPI является механизмом по умолчанию |
string |
GSSAPI |
security.protocol |
Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL |
string |
PLAINTEXT |
send.buffer.bytes |
Размер буфера отправки TCP (SO\SNDBUF), используемый при отправке данных. Если значение равно -1, по умолчанию будет использоваться параметр, установленный в ОС |
int |
131072 |
ssl.enabled.protocols |
Список протоколов, разрешенных для SSL-соединений |
list |
TLSv1.2,TLSv1.1,TLSv1 |
ssl.keystore.type |
Формат файла хранилища ключей. Параметр необязателен для клиента |
string |
JKS |
ssl.protocol |
Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимыми значениями в современных JVM являются TLS, TLSv1.1 и TLSv1.2. SSL, SSLv2 и SSLv3 могут поддерживаться в более старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности |
string |
TLS |
ssl.provider |
Имя поставщика безопасности, используемого для SSL-соединений. Значением по умолчанию является поставщик безопасности по умолчанию для JVM |
string |
null |
ssl.truststore.type |
Формат файла хранилища доверительных сертификатов (truststore) |
string |
JKS |
auto.commit.interval.ms |
Частота в миллисекундах, с которой потребитель смещает offsets автоматически в Corax, если enable.auto.commit=true |
int |
5000 |
check.crcs |
Автоматически проверять CRC32 потребляемых (вычитываемых) записей. Это гарантирует отсутствие повреждения сообщений в сети или на диске. Эта проверка добавляет некоторые накладные расходы, поэтому она может быть отключена в случаях, требующих экстремальной производительности |
boolean |
true |
client.id |
Строка id для передачи серверу при выполнении запросов. Цель этого состоит в том, чтобы иметь возможность отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера |
string |
"" |
fetch.max.wait.ms |
Максимальное количество времени, на которое сервер заблокируется перед ответом на запрос fetch, если данных недостаточно для немедленного удовлетворения требования fetch.min.bytes |
int |
500 |
interceptor.classes |
Список классов для использования в качестве перехватчиков. Реализация интерфейса org.apache.kafka.clients.ConsumerInterceptor позволяет перехватывать (и, возможно, изменять) записи, полученные потребителем. По умолчанию перехватчиков нет |
list |
"" |
metadata.max.age.ms |
Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений лидеров партиций, чтобы проактивно обнаружить новые брокеры или партиции |
long |
300000 |
metric.reporters |
Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporterinterface позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX |
list |
"" |
metrics.num.samples |
Количество выборок, сохраняемых для вычисления метрик |
int |
2 |
metrics.recording.level |
Самый высокий уровень записи для метрик |
string |
INFO |
metrics.sample.window.ms |
Окно времени, в котором вычисляется выборка метрик |
long |
30000 |
reconnect.backoff.max.ms |
Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, к которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного дрожания, чтобы избежать штормов соединения |
long |
1000 |
reconnect.backoff.ms |
Базовое время ожидания перед попыткой повторного подключения к заданному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру |
long |
50 |
retry.backoff.ms |
Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя |
long |
100 |
sasl.kerberos.kinit.cmd |
Путь к команде Kerberos kinit |
string |
/usr/bin/kinit |
sasl.kerberos.min.time.before.relogin |
Время сна потока входа между попытками обновления |
long |
60000 |
sasl.kerberos.ticket.renew.jitter |
Процент случайного дрожания, добавленного ко времени обновления |
double |
0.05 |
sasl.kerberos.ticket.renew.window.factor |
Поток входа будет спать до тех пор, пока не будет достигнут указанный коэффициент времени окна от последнего обновления до истечения срока действия билета, после чего он попытается обновить билет |
double |
0.8 |
sasl.login.refresh.buffer.seconds |
Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); если значение не указано, используется значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER |
short |
300 |
sasl.login.refresh.min.period.seconds |
Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию — 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER |
short |
60 |
sasl.login.refresh.window.factor |
Поток обновления входа (login) будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%) |
double |
0.8 |
sasl.login.refresh.window.jitter |
Максимальное количество случайного дрожания относительно времени жизни учетных данных, добавленного к времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно; значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER |
double |
0.05 |
ssl.cipher.suites |
Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров |
list |
null |
ssl.endpoint.identification.algorithm |
Алгоритм идентификации конечной точки(endpoint) для проверки имени хоста сервера с помощью сертификата сервера |
string |
https |
ssl.keymanager.algorithm |
Алгоритм, используемый фабрикой менеджера ключей для SSL-соединений. Значение по умолчанию — это алгоритм key manager factory, настроенный для виртуальной машины Java |
string |
SunX509 |
ssl.secure.random.implementation |
Реализация SecureRandom PRNG, используемая для операций шифрования SSL |
string |
null |
Конфигурирование сквозного шифрования#
Симметричное шифрование#
На стороне producer#
Producer конфигурируется следующими параметрами:
Наименование |
Определение |
Тип значения |
Значение |
|---|---|---|---|
key.serializer |
Полное имя класса сериализатора ключа |
String |
youre.adress.kafka.serialization.EncryptedByteArraySerializer |
value.serializer |
Полное имя класса сериализатора значения |
String |
youre.adress.kafka.serialization.EncryptedByteArraySerializer |
e2e.serializer.encryption.secret.key |
Ключ шифрования, секретная фраза |
String |
Задается пользователем |
e2e.serializer.encryption.class |
Encoder. Основная логика по шифрованию описывается в этом классе. Данный класс должен реализовывать интерфейс ru.sbt.kafkase.security.Encoder |
String |
Задается пользователем. Для теста можно использовать реализацию из Corax: youre.adress.kafka.security.SimpleDataEncoder |
e2e.serializer.encryption.salt.provider.class |
Соль провайдер. Класс для генерации соли, смешиваемой с данными и ключом шифрования для последующего шифрования данных. Данный класс должен реализовывать интерфейс ru.sbt.kafkase.security.SaltProvider |
String |
Задается пользователем. Для теста можно использовать реализацию из Corax: youre.adress.kafka.security.FileSaltProvider |
e2e.serializer.encryption.security.encoding.salt.location |
Путь к файлу с солью. Если используется реализация соль провайдера KafkaSberEdition: youre.adress.kafka.security.FileSaltProvider |
String |
Задается пользователем. Соль должна быть не менее 16 байт (16 символов) |
На стороне consumer#
Consumer конфигурируется следующими параметрами:
Наименование |
Определение |
Тип значения |
Значение |
|---|---|---|---|
key.deserializer |
Полное имя класса сериализатора ключа |
String |
youre.adress.kafka.serialization.EncryptedByteArrayDeserializer |
value.deserializer |
Полное имя класса сериализатора значения |
String |
youre.adress.kafka.serialization.EncryptedByteArrayDeserializer |
e2e.deserializer.encryption.secret.key |
Ключ шифрования, секретная фраза |
String |
Задается пользователем |
e2e.deserializer.encryption.class |
Decoder. Основная логика по расшифровки описывается в этом классе. Данный класс должен реализовывать интерфейс ru.sbt.kafkase.security.Decoder |
String |
Задается пользователем. Для теста можно использовать реализацию из Corax: youre.adress.kafka.security.SimpleDataDecoder |
e2e.deserializer.encryption.salt.provider.class |
Соль провайдер. Класс для генерации соли, смешиваемой с данными и ключом шифрования для последующего шифрования данных. Данный класс должен реализовывать интерфейс ru.sbt.kafkase.security.SaltProvider |
String |
Задается пользователем. Для теста можно использовать реализацию из Corax: youre.adress.kafka.security.FileSaltProvider |
e2e.deserializer.encryption.security.encoding.salt.location |
Путь к файлу с солью. Если используется реализация соль провайдера KafkaSberEdition: youre.adress.kafka.security.FileSaltProvider |
String |
Задается пользователем. Соль должна быть не менее 16 байт (16 символов) |
Внимание!
Для успешной расшифровки данных, полученных consumer, необходимо, чтобы соль и секретный ключ consumer совпадали с солью и секретным ключом producer.
Примеры конфигурации#
Пример producer.config
value.serializer=youre.adress.kafka.serialization.EncryptedByteArraySerializer
key.serializer=youre.adress.kafka.serialization.EncryptedByteArraySerializer
e2e.serializer.encryption.secret.key=secret
e2e.serializer.encryption.salt.provider.class=youre.adress.kafka.security.FileSaltProvider
e2e.serializer.encryption.class=youre.adress.kafka.security.SimpleDataEncoder
e2e.serializer.encryption.security.encoding.salt.location=config/salt.txt
Пример consumer.config
value.deserializer=youre.adress.kafka.serialization.EncryptedByteArrayDeserializer
key.deserializer=youre.adress.kafka.serialization.EncryptedByteArrayDeserializer
e2e.deserializer.encryption.secret.key=secret
e2e.deserializer.encryption.salt.provider.class=youre.adress.kafka.security.FileSaltProvider
e2e.deserializer.encryption.class=youre.adress.kafka.security.SimpleDataDecoder
e2e.deserializer.encryption.security.encoding.salt.location=config/salt.txt
Асимметричное шифрование#
Конфигурирование producer и consumer, работающих с шифрованием, возможно только путем редактирования кода. Properties - класс в java, с помощью которого происходит конфигурирование.
Для producer и consumer используется алгоритм шифрования/расшифровки RSA/ECB/PKCS1Padding.
На стороне producer#
Producer конфигурируется следующими параметрами:
Наименование |
Определение |
Обязательный параметр |
Тип значения |
Значение по умолчанию |
Значение для включения шифрования |
|---|---|---|---|---|---|
key.serializer |
Полное имя класса сериализатора ключа |
Да |
String |
- |
youre.adress.kafka.serialization.EncryptedCertByteArraySerializer |
value.serializer |
Полное имя класса сериализатора значения |
Да |
String |
- |
youre.adress.kafka.serialization.EncryptedCertByteArraySerializer |
ssl.truststore.location |
Путь к keystore |
Да |
String |
- |
Путь к файлу |
ssl.truststore.type |
Тип keystore |
Нет |
String |
JKS |
JKS |
ssl.truststore.password |
Пароль от keystore |
Да |
Экземпляр класса org.apache.kafka.common.config.types.Password |
- |
new Password (пароль) |
ssl.key.password |
Пароль от ключа |
Да |
Экземпляр класса org.apache.kafka.common.config.types.Password |
- |
new Password (пароль) |
serializer.encryption.key.alias |
Псевдоним (alias) ключа |
Да |
String |
encryptionSend |
Alias ключа |
Внимание!
Пароли в
Propertiesдолжны быть представлены экземплярами классаorg.apache.kafka.common.config.types.Password. Это маскирует пароль в логах. Другие типы не поддерживаются.
На стороне consumer#
Consumer конфигурируется следующими параметрами:
Наименование |
Определение |
Обязательный параметр |
Тип значения |
Значение по умолчанию |
Значение для включения шифрования |
|---|---|---|---|---|---|
key.deserializer |
Полное имя класса десериализатора ключа |
Да |
String |
- |
youre.adress.kafka.serialization.EncryptedCertByteArrayDeserializer |
value.deserializer |
Полное имя класса десериализатора значения |
Да |
String |
- |
youre.adress.kafka.serialization.EncryptedCertByteArrayDeserializer |
ssl.truststore.location |
Путь к keystore |
Да |
String |
- |
Путь к файлу |
ssl.truststore.type |
Тип keystore |
Нет |
String |
JKS |
JKS |
ssl.truststore.password |
Пароль от keystore |
Да |
org.apache.kafka.common.config.types.Password |
- |
new Password (пароль) |
ssl.key.password |
Пароль от ключа |
Да |
org.apache.kafka.common.config.types.Password |
- |
new Password (пароль) |
serializer.encryption.key.alias |
Псевдоним (alias) ключа |
Да |
String |
encryptionSend |
Alias ключа |
Внимание!
Пароли в
Propertiesдолжны быть представлены экземплярами классаorg.apache.kafka.common.config.types.Password. Это маскирует пароль в логах. Другие типы не поддерживаются.
Обновление SSL сертификатов на кластере Corax без недоступности кластера Corax#
Алгоритм обновления SSL сертификатов на брокерах кластера Corax:
Остановите брокер, который еще не обновлялся.
Для остановленного брокера обновите SSL сертификат.
Запустите брокер, и дождитесь завершения полной репликации (пример команды:
kafka-topics --zookeeper host:2181 --describe --under-replicated-partitions).Повторите шаг 1 для следующего брокера. Если это был последний брокер — обновление завершено.
Конфигурирование аудита событий над брокером в log-файл#
Для включения аудита над брокером Corax в log-файл, добавьте в конфигурационный файл
KAFKA_DIR/config/server.propertiesстроки:kafka.se.audit.enable=true kafka.se.audit.provider=kafka.audit.provider.LogAuditProviderгде
kafka.se.audit.enable— глобальное включение аудита;Примечание
По умолчанию будет включен аудит записи действий на брокере в log-файл
kafka-audit.logна уровне INFO.При необходимости указать другой log-файл для аудита и/или задать свои параметры (уровень логирования, вид записи, форматирование или режим записи rolling update), укажите appender в файле
config/log4j.properties:log4j.appender.auditFileAppender = org.apache.log4j.RollingFileAppender //включение режима записи rolling update log4j.appender.auditFileAppender.File = ${kafka.logs.dir}/kafka-audit.log //путь до log-файла log4j.appender.auditFileAppender.layout.ConversionPattern = [%d] %p %m (%c)%n //формат даты и времени log4j.logger.kafka.audit.provider.LogAuditProvider = INFO, auditFileAppender //уровень логирования
Конфигурирование Schema Registry#
Schema Registry REST Server#
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
true |
Разрешает или запрещает любые изменения |
|
null |
Путь к файлу, где будут сохраняться настройки совместимости и режим subjects |
|
null |
Настройка соединения с брокером Kafka для сохранения схем |
|
true |
Включает Authentication and Authorization для REST Server |
|
classpath:default-acl.json |
ACL правила по умолчанию |
|
classpath:users.properties |
Файл с пользователями и паролями |
|
youre.adress.kafka.security.SimpleTextPasswordDecoder |
Имя класса, который зашифровывает пароли; имплементирующий интерфейс |
|
youre.adress.kafka.security.FileSaltProvider |
Имя класса провайдера соли, имплементирующий интерфейс |
|
null |
Ключ для шифрования |
|
null |
Путь к файлу, содержащему salt |
|
true |
Включение отправки событий в аудит |
|
null |
URL, на который TsAuditProvider будет отправлять модели |
|
null |
URL, на который TsAuditProvider будет отправлять события |
|
null |
Имя класса, который зашифровывает пароли; имплементирующий интерфейс |
|
null |
Ключ для шифрования |
|
null |
Имя класса провайдера соли, имплементирующий интерфейс |
|
null |
Путь к файлу, содержащему salt |
|
null |
Путь к truststore |
|
null |
Пароль от truststore |
|
null |
Путь к keystore |
|
null |
Пароль от keystore |
|
TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_128_GCM_SHA256 |
Список разрешенных cipher suites |
|
TLS1.2 для jdk < 11, TLS1.2,TLS1.3 для jdk >= 11 |
The list of protocols enabled for SSL connections |
|
"JKS" |
Тип keystore |
|
"JKS" |
Тип truststore |
|
"SunX509" |
Алгоритм, используемый keymanager manager factory для SSL-подключения |
|
"SunX509" |
Алгоритм, используемый trust manager factory для SSL-подключения |
|
"https" |
Алгоритм для валидации имени сервер хоста с использованием сертификата сервера |
REST API SSL#
Настраивается стандартным для Spring-boot способом. Инструкция приведена в официальной документации.
Для шифрования/расшифровки паролей к хранилищам сертификатов необходимо использовать следующие настройки:
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
null |
Дешифрование настроек приложения. Шифрованное значение должно иметь вид |
|
null |
Ключ для шифрования |
|
null |
Имя класса провайдера соли, имплементирующий интерфейс |
|
null |
Путь к файлу содержащего salt |
Если не указана настройка schema.registry.rest.security.encoding.class, значения обрабатываться не будут.
Значение настройки аналогично настройкам Kafka:
schema.registry.rest.security.encoding.class=youre.adress.kafka.security.SimpleTextPasswordDecoder
schema.registry.rest.security.encoding.salt=youre.adress.kafka.security.FileSaltProvider
schema.registry.rest.security.encoding.key=KEY_1
schema.registry.rest.security.encoding.salt.location=classpath:/salt.txt
simple.encrypted.value=${decrypt:uKLIwVWQXoYgOBcqtN4DXA==} #На старте значение будет дешифровано.
SchemaRegistryClient#
SchemaRegistryClient - Java API для выполнения всех запросов к Schema Registry REST Server.
SchemaRegistryClient кеширует данные, полученные из REST Server. Поэтому, если схемы были пересозданы (удалены и повторно добавлены) или добавлена новая версия схемы, то приложение, которое использует SchemaRegistryClient, нужно перезапустить.
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
null |
Адрес schema registry REST server |
|
null |
Имя пользователя |
|
null |
Пароль пользователя |
|
null |
Сконфигурированный контекст для безопасного соединения |
|
null |
Флаг, отключающий проверку имени хоста |
SchemaRegistrySerde#
SchemaRegistrySerde - компонент Schema Registry, сериализатор (десериализатор), который:
сериализует данные и записывает в топик;
читает данные из топика и десериализует.
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
null |
Адрес schema registry REST server |
|
null |
Имя пользователя для подключения к REST server |
|
null |
Пароль пользователя для подключения к REST server. Указывая пароль, шифрованный стандартным для Corax способом, используйте постфикс – |
|
AVRO |
Тип данных и схемы. Поддерживаются AVRO и JSON |
|
null |
Имя класса, реализующего интерфейс |
|
true |
Использовать последнюю на момент старта версию схемы |
|
null |
|
Для расшифровки пароля клиента следующие настройки:
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
null |
Дешифрование настроек приложения. Шифрованное значение должно иметь вид |
|
null |
Ключ для шифрования |
|
null |
Имя класса провайдера соли, имплементирующий интерфейс |
|
null |
Путь к файлу, содержащему salt |
SubjectNameStrategy — реализация данного интерфейса генерирует 2 вида идентификаторов.
Если стратегия имен по умолчанию не подходит, реализуйте этот интерфейс самостоятельно.
subject— имя субъекта по имени топика и признаку isKey. По умолчанию дляvalueимя субъекта совпадает с именем топика, в конце добавляется-value. Пример:accounts-value.Для ключей добавляется
-key. Пример:accounts-key.version— ключ, в котором будет лежать номер версии схемы. По умолчанию[subject].version. Пример:accounts.version=2— использовать вторую версию схемыaccounts;accounts-key.version=3— использовать третью версию схемы для ключей топикаaccounts.
Настройки брокера#
Для безопасной конфигурации records policy, настройки могут быть указаны в параметрах брокера:
[name] — название преднастроенной policy. Его надо указывать при создании топика, который будет проверяться с этой policy.
records.policy.[name].class.name— имя класса или название policy, реализующего проверку сообщений на уровне брокера.records.policy.[name].param1,records.policy.[name].param1— параметры конфигурации policy. Доступны с методаRecordsPolicy#configure.
Пример:
records.policy.secure-policy.class.name=youre.adress.kafka.schemaregistry.SchemaRegistryValuesRecordsPolicy
# Параметры подключения к schema registry REST server.
records.policy.secure-policy.url=<Schema-registry-REST-server-address>
records.policy.secure-policy.user=user
# Пароль зашифрован.
records.policy.secure-policy.password.encrypted=uKLIwVWQXoYgOBcqtN4DXA==
# Параметры для расшифровки пароля.
# На стороне пользователя kafka знаний о параметрах подключения к REST server не требуется.
# Достаточно при создании топика указать `records.policy.name=secure-policy`
records.policy.secure-policy.security.encoding.class=youre.adress.kafka.security.SimpleTextPasswordDecoder
records.policy.secure-policy.security.encoding.salt=youre.adress.kafka.security.FileSaltProvider
records.policy.secure-policy.security.encoding.key=KEY_1
records.policy.secure-policy.security.encoding.salt.location=classpath:/salt.txt
Настройки топика#
Настройки конфигурации Schema Registry при создании или изменении топика:
Имя |
Значение по умолчанию |
Описание |
|---|---|---|
|
false |
Включает или выключает проверку данных топика с помощью Message Policy |
|
null |
Имя класса или название policy, реализующего проверку сообщений на уровне брокера. Для проверки по схеме должен быть указан класс |
|
null |
Строка конфигурации. Параметры разделяются запятой. Формат |
где
<sr-host>– хост, на котором запущен сервер Schema Registry;<sr-port>– порт, заданный в конфигурационном файлеrest.properties(по умолчанию 8080).
Особенности обновления consumers и producers#
В любом НЕ
*_TRANSITIVEрежиме одновременно могут работать приложения, которые используют версии схемы, различающиеся не более чем на 1.Обновление в режиме
BACKWARDилиBACKWARD_TRANSISTIVE: сначала обновите consumers, затем producers.Обновление в режиме
FORWARDилиFORWARD_TRANSISTIVE: сначала обновите producers, затем consumers.В режимах
FULL,FULL_TRANSITIVE,NONEпорядок обновления не важен.