Сценарии администрирования#
При выполнении действий, описанных в сценариях администрирования, пользователю должны быть назначены соответствующие разрешения из ролевой модели.
Разрешения на выполнение операций полностью соответствуют разрешениям Apache Kafka.
Конфигурирование ZooKeeper#
Конфигурирование узла ZooKeeper осуществляется в файле config/zookeeper.properties.
Допускается создание файлов конфигурации с различными именами, например:
config/zookeeper-1-node-cluster.properties
config/zookeeper-3-node-cluster.properties
config/zookeeper-3-node-cluster-on-1-machine.properties
config/zookeeper-5-node-cluster.properties
При запуске ZooKeeper достаточно указать нужный конфигурационный файл.
Конфигурирование узла ZooKeeper#
В данном подразделе описывается конфигурирование ZooKeeper для запуска в качестве standalone-приложения на одной машине. Этот вид конфигурации не обеспечивает отказоустойчивости и не рекомендован для использования в промышленной эксплуатации.
Конфигурирование узла Apache ZooKeeper осуществляется в файле zookeeper.properties.
Минимальная конфигурация#
Пример конфигурационного файла zookeeper.properties для одного узла:
dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000
Шаги создания конфигурационного файла:
Для переменной
dataDirукажите путь к директории, в которую ZooKeeper будет сохранять файлы с данными, необходимыми ему для работы.Пример
dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-dataДля переменной
clientPortукажите номер порта, к которому будут подключаться клиенты.Пример
clientPort=2181Для переменной
tickTimeукажите единицу измерения времени ZooKeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» ZooKeeper.tickTimeнеявно используется в настройках для различных временных параметров ZooKeeper. Например, для временной настройкиinitLimit=10— это означает, чтоinitLimitбудет выполняться за10 * tickTimeмиллисекунд.Пример
tickTime=2000
Расширенная конфигурация#
Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:
maxClientCnxns— максимально допустимое количество соединений (на уровне сокетов), устанавливаемых клиентом (IP-адрес) с одним из серверов кластера ZooKeeper.Если значение равно нулю или настройка не указана в файле конфигураций, то количество одновременных подключений считается неограниченным.
Данная настройка позволяет предотвратить некоторые виды DoS-атак.
Пример
maxClientCnxns=0dataLogDir— позволяет выделить отдельную директорию для хранения транзакционного лога ZooKeeper. По умолчанию все транзакционные логи ZooKeeper (файлыlog.) хранятся вместе с файламиsnapshot., т. н. «снепшотами», в одной общей директорииdataDir.Свойство
dataLogDirможет быть использовано для сохранения транзакционных логов в памяти отдельного устройства, т. е. в памяти другого физического, а не логического диска, для достижения наилучшей производительности (за счет уменьшения нагрузки на диск).Пример
dataLogDir=/another/path/to/zookeeper-logsДанные ZooKeeper — это персистентные (неизменяемые) копии ZooKeeper-узлов (
znodes), хранимые соответствующим ансамблем на диске в виде файлов двух видов:Транзакционные логи (
transaction log) — это файлы с префиксомlog., в которые в последовательном порядке записываются все изменения, происходящие с ZooKeeper-узлами.Снепшоты (
snapshot) — это файлы с префиксомsnapshot.Когда файл транзакционного лога достигает определенного размера, создается копия текущего состояния всех ZooKeeper-узлов в виде снепшота. Этот снепшот заменяет все предыдущие логи.
Примечание
Устаревшие версии файлов транзакционных логов и снепшотов не удаляются с диска сервером ZooKeeper — эта задача полностью ложится на администратора ZooKeeper.
globalOutstandingLimit— ограничение времени, позволяющее регулировать количество обращений клиентов к серверу ZooKeeper. Поскольку клиенты отправляют запросы быстрее, чем сервер успевает их обработать, сервер может не справиться и израсходовать всю доступную память. По умолчанию число запросов, находящихся в обработке в некоторый момент времени, не превышает1000.Пример
globalOutstandingLimit=1000preAllocSize— размер блока лог-файлов транзакций. По умолчанию равен 64 Мбайт. Использование транзакционного лога минимизирует количество обращений к диску. Если снепшоты ZooKeeper создаются слишком часто, размер логов транзакций может не достигать 64 Мбайт. В таких ситуациях включение этого параметра позволяет оптимизировать расход объема дискового пространства.snapCount— число транзакций, которые будут записаны в лог транзакций после создания очередного снепшота (snapshot) — файла данных ZooKeeper. По умолчанию —100000.Чтобы предотвратить одновременное создание снепшота всеми серверами в кворуме, каждый сервер ZooKeeper сделает снепшот, когда количество транзакций в журнале транзакций достигнет случайного значения, сгенерированного во время выполнения в диапазоне [snapCount/2+1, snapCount].
Пример
snapCount =100000traceFile— файл трассировки в форматеtraceFile.year.month.day, в который будут записываться все запросы клиентов. Полезен в целях отладки. Побочный эффект — некоторый спад производительности.
Текущая используемая конфигурация#
Конфигурационный файл zookeeper.properties для ZooKeeper следующий:
dataDir=/tmp/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
Конфигурирование кластера ZooKeeper#
Конфигурирование кластера Apache ZooKeeper осуществляется в файле zookeeper.properties.
Минимальная конфигурация кластера#
Пример минимального конфигурационного файла zookeeper.properties для кластера из трех серверов:
server.1=zookeeper-1-ip:2888:3888
server.2=zookeeper-2-ip:2888:3888
server.3=zookeeper-3-ip:2888:3888
dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000
initLimit=5
syncLimit=2
Шаги создания конфигурационного файла:
Добавьте в файл переменные
server.INDEX=IP:PORT1:PORT2, где:INDEX— индекс (myid) узла ZooKeeper в кластере;IP— адрес данного сервера;PORT1— порт для подключения серверов-последователей в кластере к лидеру;PORT2— порт для подключения остальных сервисов ZooKeeper в момент выбора сервиса-лидера в кластере.
Если необходимо запустить кластер ZooKeeper на нескольких серверах, то в конфигурационном файле укажите адреса всех серверов.
Пример
server.1=10.XX.XX.X1:YYYY:ZZZZ server.2=10.XX.XX.X2:YYYY:ZZZZДля переменной
dataDirукажите путь к директории, в которую ZooKeeper будет сохранять файлы с данными, необходимыми ему для работы.Пример
dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-dataДля переменной
clientPortукажите номер порта, к которому будут подключаться клиенты.Пример
clientPort=2181Для переменной
tickTimeукажите единицу измерения времени ZooKeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» ZooKeeper.tickTimeнеявно используется в настройках для различных временных параметров ZooKeeper. Например, для временной настройкиinitLimit=10— это означает, чтоinitLimitбудет выполняться за10 * tickTimeмиллисекунд.По умолчанию
tickTime=2000миллисекунд.Пример
tickTime=2000Добавьте в файл переменную
initLimit— максимальное допустимое количество «тиков» (tickTime), которое может пройти, прежде чем узел кластера синхронизируется при запуске. При превышении будет сгенерирована ошибка.Пример
initLimit=10Т. е. максимальное допустимое время на синхронизацию узла кластера при запуске будет равно (
tickTime*initLimit) миллисекунд.Примечание
Это обязательное свойство, без которого узел ZooKeeper не будет запущен.
Добавьте в файл переменную
syncLimit— максимальное допустимое количество «тиков» (tickTime) между отправкой запроса и получением ответа, прежде чем будет сгенерировано исключение.Пример
syncLimit=5Примечание
Это обязательное свойство, без которого узел ZooKeeper не будет запущен.
На этом минимальное конфигурирование кластера ZooKeeper завершается.
Расширенная конфигурация кластера#
Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:
electionAlg— тип алгоритма для выбора лидера. Возможные значения:3— версия алгоритма для быстрого выбора лидера на основе TCP.
Начиная с ZooKeeper 3.6.0 доступна только версия алгоритма
3. Другие значения параметраelectionAlgне допускаются.Пример
electionAlg=3leaderServes— опция разрешает клиентам подключаться к серверу-лидеру. Значение по умолчанию —yes. Одна из основных задач сервера-лидера — следить за обновлениями и координировать. Отключением этой опции (установкой значенияno) можно добиться некоторого прироста производительности при координации обновлений.Пример
leaderServes=yesСвойства, используемые совместно:
group.x=nnnn[:nnnn]— позволяет построить кворум с иерархической структурой, гдеx— это идентификатор кворума, а в качестве значения указывается список идентификаторов серверов, разделенных двоеточием.Примечание
Группы не должны пересекаться, т. е. объединение всех групп должно давать полный список всех серверов.
weight.x=nnnnn— при использовании вместе сgroup.x=nnnn[:nnnn]позволяет назначить «вес» каждому серверу, который будет учитываться при выборе лидера. По умолчанию вес равен1.
Пример
group.1=1:2:3 group.2=4:5:6 group.3=7:8:9 weight.1=1 weight.2=1 weight.3=1 weight.4=1 weight.5=1 weight.6=1 weight.7=1 weight.8=1 weight.9=1
Следует учитывать, что запуск всех узлов кластера должен происходить за промежуток времени, не превышающий 10 тиков (heartbeat), заданных в строке initLimit=10. Поэтому лучше создать скрипт последовательного запуска всех узлов кластера с одной машины.
Запуск каждого узла ZooKeeper выполняется командой:
bin/zookeeper-server-start.sh -daemon
config/zookeeper.properties
Или с параллельным выводом файла логов в консоль:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail --f logs/zookeeper.out
Проверка работоспособности#
При запуске всех узлов кластера ZooKeeper в файлах logs/zookeeper.out должны быть следующие записи:
В файлах
logs/zookeeper.outсерверов-последователей:INFO FOLLOWING - LEADER ELECTION TOOK - 847 (org.apache.zookeeper.server.quorum.Learner)В файле
logs/zookeeper.outсервера-лидера:INFO LEADING - LEADER ELECTION TOOK - 408 (org.apache.zookeeper.server.quorum.Leader)
Также в консоли сервера-лидера должна быть запись, информирующая о создании кворума в кластере с перечислением ID узлов:
INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting
last processed zxid: 0x100000000
(org.apache.zookeeper.server.quorum.Leader)
Тестовый запуск кластера ZooKeeper (2 узла)#
Тестовый запуск кластера ZooKeeper из двух узлов производится на двух удаленных серверах:
Server-1: `<host1-ip-address>`
Server-2: `<host2-ip-address>`
Дополнительная информация:
версия дистрибутива Corax:
kafka\2.11-0.9.0.0;версия ZooKeeper:
zookeeper-3.4.6;имя пользователя:
mon99usr;\$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где\$KAFKA_HOME— корневая директория дистрибутива Corax.
Для тестового запуска кластера ZooKeeper выполните следующие действия:
На каждом из серверов создайте директорию, где будут храниться данные ZooKeeper:
mkdir --p /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/На каждом из серверов создайте файл
myidв директории/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/, созданной на предыдущем шаге (1). Запишите в файл соответствующий уникальный идентификатор узла ZooKeeper:на сервере
Server-1в файлmyidзапишите1:echo 1 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 1на сервере
Server-2в файлmyidзапишите2:echo 2 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid 2
Внесите следующие настройки в файл конфигурации
KAFKA_DIR/config/zookeeper.propertiesдля обоих серверов:server.1=<host1-ip-address>:<port 1>:<port 2> server.2=<host2-ip-address>:<port 1>:<port 2> clientPort=2181 dataDir=/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/ maxClientCnxns=60 tickTime=2000 initLimit=10 syncLimit=5Где:
dataDir— директория для хранения данных ZooKeeper, которая была создана на шаге (1).server.1— адресServer-1; число1вserver.1— этоmyidсервераServer-1;server.2— адресServer-2; число2вserver.2— этоmyidсервераServer-2.
Запустите кластер ZooKeeper из двух узлов:
Выполните запуск узла ZooKeeper на
Server-1:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail --f logs/zookeeper.out bin/zookeeper-server-start.sh config/zookeeper.properties [2016-03-22 15:25:19,172] INFO Reading configuration from: config/zookeeper.properties [2016-03-22 15:25:19,174] WARN No server failure will be tolerated. You need at least 3 servers [2016-03-22 15:25:19,174] INFO Defaulting to majority quorums [2016-03-22 15:25:19,179] INFO autopurge.snapRetainCount set to 3 [2016-03-22 15:25:19,179] INFO autopurge.purgeInterval set to 0 [2016-03-22 15:25:19,179] INFO Purge task is not scheduled. [2016-03-22 15:25:19,198] INFO Starting quorum peer [2016-03-22 15:25:19,210] INFO binding to port 0.0.0.0/0.0.0.0:3181 [2016-03-22 15:25:19,234] INFO tickTime set to 2000 [2016-03-22 15:25:19,234] INFO minSessionTimeout set to -1 [2016-03-22 15:25:19,234] INFO maxSessionTimeout set to -1 [2016-03-22 15:25:19,234] INFO initLimit set to 10 [2016-03-22 15:25:19,247] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001 [2016-03-22 15:25:19,260] INFO My election bind port: <host1-ip-address>:3888 [2016-03-22 15:25:19,270] INFO New election. My id = 1, proposed zxid=0x700000001 [2016-03-22 15:25:19,272] INFO Notification: 1 (message format version), 1 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection) [2016-03-22 15:25:19,284] WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) ... [2016-03-22 15:25:19,892] INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection) [2016-03-22 15:25:20,172] INFO Received connection request /<host2-ip-address>:45491 (org.apache.zookeeper.server.quorum.QuorumCnxManager) [2016-03-22 15:25:20,379] INFO FOLLOWING [2016-03-22 15:25:20,384] INFO TCP NoDelay set to: true [2016-03-22 15:25:20,390] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [2016-03-22 15:25:20,390] INFO Server environment:host.name=SBT-IPO-203.youre.adress.ru [2016-03-22 15:25:20,390] INFO Server environment:java.version=1.8.0_60 [2016-03-22 15:25:20,390] INFO Server environment:java.home=/usr/java/jre1.8.0_60 [2016-03-22 15:25:20,390] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/.. [2016-03-22 15:25:20,391] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [2016-03-22 15:25:20,391] INFO Server environment:java.io.tmpdir=/tmp [2016-03-22 15:25:20,391] INFO Server environment:java.compiler=\<NA\> [2016-03-22 15:25:20,391] INFO Server environment:os.name=Linux [2016-03-22 15:25:20,391] INFO Server environment:os.arch=amd64 [2016-03-22 15:25:20,391] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [2016-03-22 15:25:20,391] INFO Server environment:user.name=mon99usr [2016-03-22 15:25:20,391] INFO Server environment:user.home=/home/mon99usr [2016-03-22 15:25:20,391] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [2016-03-22 15:25:20,393] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [2016-03-22 15:25:20,394] INFO FOLLOWING - LEADER ELECTION TOOK - 1124 (org.apache.zookeeper.server.quorum.Learner) [2016-03-22 15:25:20,421] INFO Getting a diff from the leader 0x700000001 (org.apache.zookeeper.server.quorum.Learner) [2016-03-22 15:25:20,425] INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001Запуск выполнен успешно.
Выполните запуск узла ZooKeeper на
Server-2:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail --f logs/zookeeper.out [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh config/zookeeper.properties [2016-03-22 15:25:19,874] INFO Reading configuration from: config/zookeeper.properties [2016-03-22 15:25:19,878] WARN No server failure will be tolerated. You need at least 3 servers. [2016-03-22 15:25:19,878] INFO Defaulting to majority quorums [2016-03-22 15:25:19,889] INFO autopurge.snapRetainCount set to 3 [2016-03-22 15:25:19,889] INFO autopurge.purgeInterval set to 0 [2016-03-22 15:25:19,889] INFO Purge task is not scheduled. [2016-03-22 15:25:19,941] INFO Starting quorum peer [2016-03-22 15:25:19,971] INFO binding to port 0.0.0.0/0.0.0.0:3181 [2016-03-22 15:25:20,037] INFO tickTime set to 2000 [2016-03-22 15:25:20,037] INFO minSessionTimeout set to -1 [2016-03-22 15:25:20,037] INFO maxSessionTimeout set to -1 [2016-03-22 15:25:20,038] INFO initLimit set to 10 [2016-03-22 15:25:20,128] INFO My election bind port: /<host2-ip-address>:3888 [2016-03-22 15:25:20,152] INFO LOOKING [2016-03-22 15:25:20,155] INFO New election. My id = 2, proposed zxid=0x700000001 [2016-03-22 15:25:20,176] INFO Notification: 1 (message format version), 2 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection) ... [2016-03-22 15:25:20,381] INFO LEADING [2016-03-22 15:25:20,386] INFO TCP NoDelay set to: true [2016-03-22 15:25:20,395] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [2016-03-22 15:25:20,395] INFO Server environment:host.name=SBT-IPO-204.youre.adress.ru [2016-03-22 15:25:20,395] INFO Server environment:java.version=1.8.0_66 [2016-03-22 15:25:20,395] INFO Server environment:java.home=/usr/java/jre1.8.0_66 [2016-03-22 15:25:20,395] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../libs/scala- (org.apache.zookeeper.server.ZooKeeperServer) [2016-03-22 15:25:20,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib [2016-03-22 15:25:20,396] INFO Server environment:java.io.tmpdir=/tmp [2016-03-22 15:25:20,396] INFO Server environment:java.compiler=\<NA\> [2016-03-22 15:25:20,396] INFO Server environment:os.name=Linux [2016-03-22 15:25:20,396] INFO Server environment:os.arch=amd64 [2016-03-22 15:25:20,396] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [2016-03-22 15:25:20,396] INFO Server environment:user.name=mon99usr [2016-03-22 15:25:20,396] INFO Server environment:user.home=/home/mon99usr [2016-03-22 15:25:20,396] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [2016-03-22 15:25:20,397] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 (org.apache.zookeeper.server.ZooKeeperServer) [2016-03-22 15:25:20,398] INFO LEADING - LEADER ELECTION TOOK - 243 (org.apache.zookeeper.server.quorum.Leader) [2016-03-22 15:25:20,410] INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer\@64c4b35a [2016-03-22 15:25:20,419] INFO Synchronizing with Follower sid: 1 maxCommittedLog=0x700000001 minCommittedLog=0x300000001 peerLastZxid=0x700000001 [2016-03-22 15:25:20,419] INFO Sending DIFF [2016-03-22 15:25:20,431] INFO Received NEWLEADER-ACK message from 1 [2016-03-22 15:25:20,433] INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting last processed zxid: 0x900000000 (org.apache.zookeeper.server.quorum.Leader)Запуск выполнен успешно.
Пояснения к логам ZooKeeper на сервере Server-1:
WARN No server failure will be tolerated. You need at least 3 serversВ случае отказа одного из серверов ZooKeeper сервис ZooKeeper не сможет продолжить свою работу, т. к. нарушено правило кворума о строгом большинстве доступных узлов. Т. е. число работоспособных узлов в ZooKeeper-кластере всегда должно быть больше числа неработоспособных узлов. В данном случае узла два и отказ одного из узлов приведет к неработоспособности сервиса.
WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888 java.net.ConnectException: Connection refusedОтказ при попытке соединения с
Server-2, т. к.Server-2в данный момент еще не был запущен.INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection)Тайм-аут при попытке выбрать лидера.
INFO Received connection request /<host2-ip-address>:45491Получен ответ на запрос о соединении с сервером
Server-2.INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2Описание конфигурации созданного сервера.
INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001Сохранение образа текущего состояния ZooKeeper.
INFO FOLLOWINGПосле успешного соединения
Server-1сServer-2:Server-1становится сервером-последователем;Server-2становится сервером-лидером.
FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn\'t match stored brokerId 1 in meta.propertiesДанная ошибка запуска ZooKeeper на сервере
Server-2происходит по причине конфликта в служебный файл ZooKeepermeta.propertiesзаписался неверный идентификатор брокераbroker.id=1, в то время как идентификаторmyidсервераServer-2указан, как равный двум. Решением проблемы будет удаление файлаmeta.properties, тогда при новом запуске ZooKeeper этот файл будет сгенерирован автоматически и с корректными настройками.
Пояснения к логам ZooKeeper на сервере Server-2:
INFO LOOKINGINFO New election. My id = 2, proposed zxid=0x700000001...INFO LEADING -- LEADING ELECTION TOOK -- 250Поиск сервера-лидера (т. к. сервер-лидер еще не выбран), голосование на выбор сервера-лидера (каждый из серверов предлагает себя), сервером-лидером выбран
Server-2, аServer-1автоматически становится сервером-последователем.WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888(org.apache.zookeeper.server.quorum.QuorumCnxManager)java.net.ConnectException: Connection refusedПопытка подключиться к отключенному серверу для выбора лидера. Сообщение возникает, если отключить
Server-1илиServer-2во время работы обоих серверов.
Тестовый запуск кластера ZooKeeper (3 узла)#
Тестовый запуск кластера ZooKeeper из трех узлов и кластера Corax из трех узлов будет производиться на трех удаленных серверах:
Server-1: <host1-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]
Server-2: <host2-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]
Server-3: <host3-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]
Дополнительная информация:
версия дистрибутива Corax: kafka\2.11-0.9.0.0;
версия ZooKeeper: zookeeper-3.4.6;
имя пользователя:
mon99usr;\$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где\$KAFKA_HOME— домашняя корневая директория дистрибутива Corax.
Конфигурация кластера ZooKeeper из трех узлов
Каждый узел располагается на отдельном сервере.
Для конфигурации кластера ZooKeeper выполните следующие действия:
На каждом из серверов создайте директорию, где будут храниться данные ZooKeeper:
mkdir --p /var/tmp/kafka/zookeeper-data/На каждом из серверов создайте файл
myidв директории/var/tmp/kafka/zookeeper-data/, созданной на предыдущем шаге (1). Это уникальный идентификатор каждого узла ZooKeeper, запишите его в файл:на сервере
Server-1в файлmyidзапишите1:echo 1 > /var/tmp/kafka/zookeeper-data/myid cat /var/tmp/kafka/zookeeper-data/myid 1на сервере
Server-2в файлmyidзапишите2:echo 2 > /var/tmp/kafka/zookeeper-data/myid cat /var/tmp/kafka/zookeeper-data/myid 2на сервере
Server-3в файлmyidзапишите3:echo 3 > /var/tmp/kafka/zookeeper-data/myid cat /var/tmp/kafka/zookeeper-data/myid 3
Отредактируйте файл конфигурации ZooKeeper.
KAFKA_DIR/config/zookeeper.propertiesдля трех серверов одинаков. Укажите корректные значения для параметров:###################[ MAIN ]################### dataDir=data/zookeeper-data clientPort=2181 ##############[ CLUSTER MODE ON ]############# server.1=<host1-ip-address>:2888:3888 server.2=<host2-ip-address>:2888:3888 server.3=<host3-ip-address>:2888:3888 tickTime=2000 initLimit=10 syncLimit=5Где:
dataDir— директория для хранения данных ZooKeeper;clientPort— порт, к которому подключаются клиенты ZooKeeper. Для PLAINTEXT — 2181, для SSL — 2182.server.1— адресServer-1; число1вserver.1— этоmyidсервераServer-1;server.2— адресServer-2; число2вserver.2— этоmyidсервераServer-2;server.3— адресServer-3; число3вServer.3— этоmyidсервераServer-3.
Запуск кластера ZooKeeper из трех узлов
Выполните запуск узла ZooKeeper на сервере
Server-1в фоновом режиме с выводом логов:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [2016-03-28 12:56:43,375] INFO Reading configuration from: config/zookeeper.properties [2016-03-28 12:56:43,377] INFO Defaulting to majority quorums [2016-03-28 12:56:43,382] INFO autopurge.snapRetainCount set to 3 [2016-03-28 12:56:43,382] INFO autopurge.purgeInterval set to 0 [2016-03-28 12:56:43,382] INFO Purge task is not scheduled. [2016-03-28 12:56:43,411] INFO Starting quorum peer [2016-03-28 12:56:43,424] INFO binding to port 0.0.0.0/0.0.0.0:3181 [2016-03-28 12:56:43,447] INFO tickTime set to 2000 [2016-03-28 12:56:43,448] INFO minSessionTimeout set to -1 [2016-03-28 12:56:43,448] INFO maxSessionTimeout set to -1 [2016-03-28 12:56:43,448] INFO initLimit set to 10 [2016-03-28 12:56:43,467] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac [2016-03-28 12:56:43,507] INFO My election bind port: /<host1-ip-address>:3888 [2016-03-28 12:56:43,518] INFO LOOKING [2016-03-28 12:56:43,520] INFO New election. My id = 1, proposed zxid=0x310000003d [2016-03-28 12:56:43,522] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [2016-03-28 12:56:43,530] WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888 java.net.ConnectException: Connection refused [2016-03-28 12:56:43,534] WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888 java.net.ConnectException: Connection refused [2016-03-28 12:56:56,388] INFO Received connection request /<host2-ip-address>:11287 (org.apache.zookeeper.server.quorum.QuorumCnxManager) [2016-03-28 12:56:56,595] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer) [2016-03-28 12:56:56,600] INFO TCP NoDelay set to: true [2016-03-28 12:56:56,606] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [2016-03-28 12:56:56,607] INFO Server environment:host.name=SBT-IPO-203.youre.adress.ru [2016-03-28 12:56:56,607] INFO Server environment:java.version=1.8.0_60 [2016-03-28 12:56:56,607] INFO Server environment:java.home=/usr/java/jre1.8.0_60 [2016-03-28 12:56:56,607] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../.... [2016-03-28 12:56:56,607] INFO Server environment:java.library.path=/usr/java/packages/... [2016-03-28 12:56:56,607] INFO Server environment:java.io.tmpdir=/tmp [2016-03-28 12:56:56,607] INFO Server environment:java.compiler=\<NA\> [2016-03-28 12:56:56,607] INFO Server environment:os.name=Linux [2016-03-28 12:56:56,607] INFO Server environment:os.arch=amd64 [2016-03-28 12:56:56,607] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [2016-03-28 12:56:56,607] INFO Server environment:user.name=mon99usr [2016-03-28 12:56:56,607] INFO Server environment:user.home=/home/mon99usr [2016-03-28 12:56:56,607] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [2016-03-28 12:56:56,609] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 Datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [2016-03-28 12:56:56,609] INFO FOLLOWING - LEADER ELECTION TOOK - 13090 (org.apache.zookeeper.server.quorum.Learner) [2016-03-28 12:56:56,616] WARN Unexpected exception, tries=0, connecting to /<host2-ip-address>:2888 java.net.ConnectException: Connection refused ... [2016-03-28 12:56:57,640] INFO Getting a diff from the leader 0x310000003d [2016-03-28 12:56:57,645] INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d [2016-03-28 12:57:04,004] WARN Got zxid 0x3200000001 expected 0x1 [2016-03-28 12:57:04,004] INFO Creating new log file: log.3200000001 [2016-03-28 12:57:06,472] INFO Received connection request /<host3-ip-address>:47171 [2016-03-28 12:57:06,475] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) FOLLOWING (my state)Где
tail -f logs/zookeeper.out— вывод текущего лога ZooKeeper-брокера из файла в реальном времени.Запуск ZooKeeper-брокера на
Server-1выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере
<host2-ip-address>и с Брокером на сервере <host3-ip-address>, так как Брокеры на этих серверах еще не запущены:INFO LOOKING WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888 java.net.ConnectException: Connection refused WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888 java.net.ConnectException: Connection refusedПринят запрос на соединение от Брокера на сервере
Server-2по адресу<host2-ip-address>в связи со запуском этого Брокера:INFO Received connection request /<host2-ip-address>:11287Как правило, после этого сообщения новый Брокер включается в кластер.
Выбран сервер-лидер — это только что запущенный Брокер на сервере
Server-2. Брокер на текущем сервереServer-1выбран в качестве сервера-последователя (FOLLOWER):INFO FOLLOWINGСинхронизация Брокера-последователя на сервере
Server-1с Брокером-лидером на сервереServer-2: получение от сервера-лидераServer-2текущего состояния системы (diff). В результате на сервереServer-1будет создан и сохранен соответствующий образ (snapshot) только что полученных данных:INFO Getting a diff from the leader 0x310000003d INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003dПринят запрос на соединение от Брокера на сервере
Server-3по адресу<host3-ip-address>в связи с запуском этого Брокера:INFO Received connection request /<host3-ip-address>:11287С этого момента новый Брокер включается в кластер.
После сообщения
INFO Notification: 1...вывод логов прекращается в связи с отсутствием дальнейших событий.
Выполните запуск узла ZooKeeper на сервере
Server-2в фоновом режиме с выводом логов:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [2016-03-28 12:56:56,055] INFO Reading configuration from: config/zookeeper.properties [2016-03-28 12:56:56,058] INFO Defaulting to majority quorums [2016-03-28 12:56:56,067] INFO autopurge.snapRetainCount set to 3 [2016-03-28 12:56:56,067] INFO autopurge.purgeInterval set to 0 [2016-03-28 12:56:56,067] INFO Purge task is not scheduled. [2016-03-28 12:56:56,116] INFO Starting quorum peer [2016-03-28 12:56:56,135] INFO binding to port 0.0.0.0/0.0.0.0:3181 [2016-03-28 12:56:56,184] INFO tickTime set to 2000 [2016-03-28 12:56:56,185] INFO minSessionTimeout set to -1 [2016-03-28 12:56:56,185] INFO maxSessionTimeout set to -1 [2016-03-28 12:56:56,185] INFO initLimit set to 10 [2016-03-28 12:56:56,211] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.2a000000a9 [2016-03-28 12:56:56,375] INFO My election bind port: /<host2-ip-address>:3888 [2016-03-28 12:56:56,378] INFO LOOKING (org.apache.zookeeper.server.quorum.QuorumPeer) [2016-03-28 12:56:56,380] INFO New election. My id = 2, proposed zxid=0x310000003d [2016-03-28 12:56:56,394] INFO Notification: 1 (message format version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [2016-03-28 12:56:56,396] WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) ... [2016-03-28 12:56:56,398] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [2016-03-28 12:56:56,399] INFO Notification: 1 (message format version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state) [2016-03-28 12:56:56,601] INFO LEADING [2016-03-28 12:56:56,605] INFO TCP NoDelay set to: true [2016-03-28 12:56:56,615] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT [2016-03-28 12:56:56,615] INFO Server environment:host.name=SBT-IPO-204.youre.adress.ru [2016-03-28 12:56:56,615] INFO Server environment:java.version=1.8.0_66 [2016-03-28 12:56:56,615] INFO Server environment:java.home=/usr/java/jre1.8.0_66 [2016-03-28 12:56:56,615] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/.... [2016-03-28 12:56:56,616] INFO Server environment:java.library.path=/usr/java/... [2016-03-28 12:56:56,616] INFO Server environment:java.io.tmpdir=/tmp [2016-03-28 12:56:56,616] INFO Server environment:java.compiler=\<NA\> [2016-03-28 12:56:56,616] INFO Server environment:os.name=Linux [2016-03-28 12:56:56,616] INFO Server environment:os.arch=amd64 [2016-03-28 12:56:56,616] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64 [2016-03-28 12:56:56,616] INFO Server environment:user.name=mon99usr [2016-03-28 12:56:56,616] INFO Server environment:user.home=/home/mon99usr [2016-03-28 12:56:56,616] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0 [2016-03-28 12:56:56,618] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [2016-03-28 12:56:56,619] INFO LEADING - LEADER ELECTION TOOK - 239 [2016-03-28 12:56:57,627] INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$... [2016-03-28 12:56:57,638] INFO Synchronizing with Follower sid: 1 maxCommittedLog=0x310000003d minCommittedLog=0x2e0000006f peerLastZxid=0x310000003d [2016-03-28 12:56:57,638] INFO Sending DIFF [2016-03-28 12:56:57,666] INFO Received NEWLEADER-ACK message from 1 [2016-03-28 12:56:57,669] INFO Have quorum of supporters, sids: [ 1,2]; starting up and setting last processed zxid: .. [2016-03-28 12:57:04,000] INFO Expiring session 0x253bc816e870000, timeout of 6000ms exceeded [2016-03-28 12:57:04,001] INFO Processed session termination for sessionid: 0x253bc816e870000 [2016-03-28 12:57:04,002] INFO Creating new log file: log.3200000001 [2016-03-28 12:57:06,475] INFO Received connection request /<host3-ip-address>:34164 [2016-03-28 12:57:06,477] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) LEADING (my state) [2016-03-28 12:57:06,513] INFO Follower sid: 3 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer@... [2016-03-28 12:57:06,517] INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d [2016-03-28 12:57:06,518] INFO Sending DIFF [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3Запуск брокера на сервере
Server-2выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере <host3-ip-address>, так как Брокер на этом сервере еще не запущен:
INFO LOOKING WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888 java.net.ConnectException: Connection refusedЛидером выбран Брокер на сервере
Server-2(на выбор лидера ушло 239 миллисекунд). От Брокера на сервереServer-1получено соответствующее подтверждение. Выведено сообщение, что создан новый кворум [1,2] из двух источников-брокеров с идентификаторамиmyid=1и2соответственно.INFO LEADING INFO LEADING - LEADER ELECTION TOOK -- 239 INFO Have quorum of supporters, sids: [ 1,2 ];Синхронизация Брокера-последователя на сервере
Server-1с Брокером-лидером на сервереServer-2. Брокер-лидер отправляет текущее состояние системы (DIFF) и тем самым запускает синхронизацию. После обработкиDIFFБрокером-последователем (сохраненияDIFFв виде образа (snapshot) текущего состояния системы) от Брокера-последователя поступает соответствующее подтверждение о полученииReceived NEWLEADER-ACK:INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.Q... INFO Synchronizing with Follower sid: 1 INFO Sending DIFF INFO Received NEWLEADER-ACK message from 1Принят запрос на соединение от нового Брокера на сервере
Server-3по адресу<host3-ip-address>в связи со запуском этого Брокера:INFO Received connection request /<host3-ip-address>:34164Новый Брокер с идентификатором
myid=3принят в кворум в качестве Брокера-последователя:INFO Follower sid: 3Синхронизация Брокера-последователя на сервере
Server-3с Брокером-лидером на сервереServer-2. Брокеру-последователю отправлено текущее состояние системы (DIFF), успешно обработано Брокером-последователем, в ответ выслано соответствующее подтверждение:INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d [2016-03-28 12:57:06,518] INFO Sending DIFF [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
Выполните запуск узла ZooKeeper на сервере
Server-3в фоновом режиме с выводом логов:> bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out [2016-03-28 12:57:07,273] INFO Reading configuration from: config/zookeeper.properties [2016-03-28 12:57:07,276] INFO Defaulting to majority quorums [2016-03-28 12:57:07,282] INFO autopurge.snapRetainCount set to 3 [2016-03-28 12:57:07,282] INFO autopurge.purgeInterval set to 0 [2016-03-28 12:57:07,282] INFO Purge task is not scheduled. [2016-03-28 12:57:07,303] INFO Starting quorum peer [2016-03-28 12:57:07,321] INFO binding to port 0.0.0.0/0.0.0.0:3181 [2016-03-28 12:57:07,357] INFO tickTime set to 2000 [2016-03-28 12:57:07,357] INFO minSessionTimeout set to -1 [2016-03-28 12:57:07,357] INFO maxSessionTimeout set to -1 [2016-03-28 12:57:07,357] INFO initLimit set to 10 [2016-03-28 12:57:07,378] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac [2016-03-28 12:57:07,430] INFO My election bind port: /<host3-ip-address>:3888 [2016-03-28 12:57:07,444] INFO LOOKING [2016-03-28 12:57:07,445] INFO New election. My id = 3, proposed zxid=0x31000003d [2016-03-28 12:57:07,459] INFO Notification: 1 (message format version), 3 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [2016-03-28 12:57:07,460] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [2016-03-28 12:57:07,460] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), FOLLOWING (n.state), 1 (n.sid), 0x2 (n.peerEpoch) LOOKING (my state) [2016-03-28 12:57:07,461] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31(n.peerEpoch) LOOKING (my state) [2016-03-28 12:57:07,462] INFO Notification: 1 (message format version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LEADING (n.state), 2 (n.sid), 0x32(n.peerEpoch) LOOKING (my state) [2016-03-28 12:57:07,462] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer) [2016-03-28 12:57:07,469] INFO TCP NoDelay set to: true [2016-03-28 12:57:07,478] INFO Server environment:zookeeper.version=3.4.6-156995, built on 02/20/2014 09:09 GMT [2016-03-28 12:57:07,478] INFO Server environment:host.name=SBT-IPO-208.youre.adress.ru [2016-03-28 12:57:07,487] INFO Server environment:java.version=1.8.0_71 [2016-03-28 12:57:07,487] INFO Server environment:java.home=/usr/java/jre1.8.0_1 [2016-03-28 12:57:07,487] INFO Server environment:java.class.path=:/home/mon99ur/... [2016-03-28 12:57:07,488] INFO Server environment:java.library.path=/usr/java/packages/... [2016-03-28 12:57:07,488] INFO Server environment:java.io.tmpdir=/tmp [2016-03-28 12:57:07,488] INFO Server environment:java.compiler=\<NA\> [2016-03-28 12:57:07,488] INFO Server environment:os.name=Linux [2016-03-28 12:57:07,488] INFO Server environment:os.arch=amd64 [2016-03-28 12:57:07,488] INFO Server environment:os.version=3.10.0-123.9.3.el7x86_64 [2016-03-28 12:57:07,488] INFO Server environment:user.name=mon99usr [2016-03-28 12:57:07,488] INFO Server environment:user.home=/home/mon99usr [2016-03-28 12:57:07,488] INFO Server environment:user.dir=/home/mon99usr/kafka2.11-0.9.0.0 [2016-03-28 12:57:07,490] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 [2016-03-28 12:57:07,491] INFO FOLLOWING - LEADER ELECTION TOOK - 46 [2016-03-28 12:57:07,502] INFO Getting a diff from the leader 0x3200000001 [2016-03-28 12:57:07,506] WARN Got zxid 0x3200000001 expected 0x1 [2016-03-28 12:57:07,508] INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001Запуск брокера на сервере
Server-3выполнен успешно.Краткий анализ логов
Поиск других Брокеров с целью выбора лидера, инициация алгоритма выбора лидера, предложение своей кандидатуры в качестве лидера аналогичны инициализации двух предыдущих Брокеров:
INFO LOOKING INFO New election. My id = 3, proposed zxid=0x31000003dБрокер на сервере
Server-3выбран в качестве Брокера-последователя:INFO FOLLOWING INFO FOLLOWING - LEADER ELECTION TOOK -- 46Синхронизация с Брокером на сервере
Server-2. Получение состояния системы и сохранение образа:INFO Getting a diff from the leader 0x3200000001 INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
Конфигурация кластера Corax из трех узлов
Каждый узел расположен на отдельном сервере.
В целях тестирования доступности сервиса ZooKeeper с различным количеством работающих узлов сконфигурируйте кластер Corax (в сценарии рассмотрена конфигурация кластера при установке Corax по профилю no-auth):
Отредактируйте файл конфигурации
KAFKA_DIR/config/server.propertiesдля каждого Corax-брокера. Укажите корректные значения дляbroker.idиzookeeper.connect:Для сервера
Server-1:##########################[ MAIN ]######################### broker.id=1 log.dirs=data/kafka-data ########################[ ZOOKEEPER ]###################### zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 zookeeper.connection.timeout.ms=6000 zookeeper.session.timeout.ms=18000 #####################[ AUTHENTICATION ]#################### listeners=PLAINTEXT://<host1-ip-address>:9092Для сервера
Server-2:##########################[ MAIN ]######################### broker.id=2 log.dirs=data/kafka-data ########################[ ZOOKEEPER ]###################### zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 zookeeper.connection.timeout.ms=6000 zookeeper.session.timeout.ms=18000 #####################[ AUTHENTICATION ]#################### listeners=PLAINTEXT://<host2-ip-address>:9092Для сервера
Server-3:##########################[ MAIN ]######################### broker.id=3 log.dirs=data/kafka-data ########################[ ZOOKEEPER ]###################### zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 zookeeper.connection.timeout.ms=6000 zookeeper.session.timeout.ms=18000 #####################[ AUTHENTICATION ]#################### listeners=PLAINTEXT://<host3-ip-address>:9092
Где:
broker.id— идентификатор Corax-брокера, обязательно должен быть уникальным в рамках кластера Corax;log.dirs— директория, в которой хранятся логи Corax;listeners— IP-адрес сервера, на котором находится Corax-брокер в форматеPROTOCOL://IP:PORT:PROTOCOL— протокол подключения к брокеру. Может принимать значенияPLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL;IP— адрес сервера, на котором выполняется Corax-брокер;PORT— порт, к которому подключаются клиенты Corax. Для PLAINTEXT — 9092, для SSL — 9093.
zookeeper.connect— список адресов ZooKeeper-брокеров в форматеIP:PORT, где:IP— адрес сервера, на котором выполняется ZooKeeper-брокер;PORT— порт, к которому подключаются клиенты ZooKeeper. Для PLAINTEXT — 2181, для SSL — 2182.
Запуск кластера Corax из трех узлов
Выполните запуск узла Corax на сервере
Server-1в фоновом режиме с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-203 kafka]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [2016-03-28 17:16:50,514] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 11 milliseconds. [2016-03-28 17:16:50,531] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:16:50,532] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:16:50,537] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [2016-03-28 17:16:50,549] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:16:50,555] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:16:50,556] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(<host1-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-28 17:16:50,571] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [2016-03-28 17:17:06,309] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = false quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = false offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.9.0.X log.retention.hours = 168 num.partitions = 1 listeners = null ssl.provider = null ssl.enabled.protocols = [TLSv1.3, TLSv1.2] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 524288000 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = null controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = <host1-ip-address> log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 30000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /var/tmp/kafka/kafka-logs controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 3 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = null replica.fetch.min.bytes = 1 unclean.leader.election.enable = true group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 1 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig) [2016-03-28 17:17:06,377] INFO starting (kafka.server.KafkaServer) [2016-03-28 17:17:06,382] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer) [2016-03-28 17:17:06,748] INFO Loading logs. (kafka.log.LogManager) [2016-03-28 17:17:06,986] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-0/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log) [2016-03-28 17:17:06,987] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log) [2016-03-28 17:17:06,988] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log) [2016-03-28 17:17:06,990] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-1/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log) [2016-03-28 17:17:06,991] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log) [2016-03-28 17:17:06,992] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log) [2016-03-28 17:17:06,994] INFO Logs loading complete. (kafka.log.LogManager) [2016-03-28 17:17:06,995] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-03-28 17:17:06,996] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-03-28 17:17:07,040] INFO Awaiting socket connections on SBT-IPO-203.youre.adress.ru:9092. (kafka.network.Acceptor) [2016-03-28 17:17:07,043] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer) [2016-03-28 17:17:07,062] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:17:07,063] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:17:07,124] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:17:07,142] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:17:07,142] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-03-28 17:17:09,020] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [2016-03-28 17:17:09,051] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:17:09,053] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:17:09,053] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:17:09,057] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:17:09,060] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-03-28 17:17:09,075] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:17:09,084] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [2016-03-28 17:17:09,128] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:17:09,138] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:17:09,139] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -\> EndPoint(<host1-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-28 17:17:09,151] INFO [Kafka Server 1], started (kafka.server.KafkaServer) [2016-03-28 17:17:09,409] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [NUMBERS,0],[NUMBERS,1] (kafka.server.ReplicaFetcherManager) [2016-03-28 17:17:10,584] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions[NUMBERS,0] (kafka.server.ReplicaFetcherManager)Где
tail -f logs/server.log— вывод текущего лога Corax-брокера из файла в реальном времени.Запуск Corax-брокера на сервере
Server-1выполнен успешно.Выполните запуск узла ZooKeeper на сервере
Server-2в фоновом режиме с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-204 kafka]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [2016-03-28 17:35:35,726] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,36] [2016-03-28 17:35:35,728] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,36] in 2 milliseconds. [2016-03-28 17:36:11,334] INFO KafkaConfig values: .... (kafka.server.KafkaConfig) [2016-03-28 17:36:11,441] INFO starting (kafka.server.KafkaServer) [2016-03-28 17:36:11,450] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer) [2016-03-28 17:36:11,870] INFO Loading logs. (kafka.log.LogManager) [2016-03-28 17:36:12,775] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-1/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log) [2016-03-28 17:36:12,776] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log) [2016-03-28 17:36:12,777] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log) [2016-03-28 17:36:12,780] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-0/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log) [2016-03-28 17:36:12,781] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log) [2016-03-28 17:36:12,782] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log) [2016-03-28 17:36:12,785] INFO Logs loading complete. (kafka.log.LogManager) [2016-03-28 17:36:12,786] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-03-28 17:36:12,799] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-03-28 17:36:12,858] INFO Awaiting socket connections on SBT-IPO-204.youre.adress.ru:9092. (kafka.network.Acceptor) [2016-03-28 17:36:12,861] INFO [Socket Server on Broker 2], Started 1 acceptor threads (kafka.network.SocketServer) [2016-03-28 17:36:13,054] INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:36:13,064] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:36:13,064] INFO [GroupCoordinator 2]: Startup complete. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:36:13,073] INFO [Group Metadata Manager on Broker 2]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-03-28 17:36:13,075] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:36:13,087] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:36:13,088] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:36:13,094] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [2016-03-28 17:36:13,119] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:36:13,133] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:36:13,135] INFO Registered broker 2 at path /brokers/ids/2 with addresses: PLAINTEXT -\> EndPoint(<host2-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-28 17:36:13,149] INFO [Kafka Server 2], started (kafka.server.KafkaServer) [2016-03-28 17:36:15,082] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test2,1],[test,14],[test2,13],[test,15],[test,3],[test2,11],[test,6],[test5,1],[test2,5],[ThrashMetal,2],[test,17],[test5,0],[test,5],[test,11],[test3,1],[test2,15],[test2,18],[test2,8],[test,16],[test,1],[NUMBERS,0],[test,4],[test,19],[test2,3],[test2,10],[ThrashMetal,0],[test,18],[test,13],[test2,2],[test2,17],[test,12],[test2,7],[test,2],[test,8],[test2,16],[NUMBERS,1],[test2,6],[test,0],[test2,4],[test2,0],[test,7],[test2,9],[test,9],[superTest,1],[test3,0],[test2,14],[ThrashMetal,1],[superTest,0],[test2,19],[test,10],[test2,12] (kafka.server.ReplicaFetcherManager) [2016-03-28 17:36:15,096] INFO Truncating log NUMBERS-0 to offset 13. (kafka.log.Log) [2016-03-28 17:36:15,100] INFO Truncating log NUMBERS-1 to offset 14. (kafka.log.Log) [2016-03-28 17:36:15,162] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([[NUMBERS,1], ...) initOffset 14 to broker BrokerEndPoint(1,<host1-ip-address>,9092)] [2016-03-28 17:36:15,171] INFO [ReplicaFetcherThread-0-1], Starting (kafka.server.ReplicaFetcherThread) [2016-03-28 17:36:15,354] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,22] (kafka.coordinator.GroupMetadataManager) [2016-03-28 17:36:15,424] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,22] in 69 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-03-28 17:37:14,092] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [NUMBERS,1] (kafka.server.ReplicaFetcherManager)Запуск Corax-брокера на сервере
Server-2выполнен успешно.Выполните запуск узла Corax на сервере
Server-3в фоновом режиме с выводом логов:bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log [mon99usr\@SBT-IPO-208 kafka]\$ bin/kafka-server-start.sh -daemon config/server-3-servers.properties && tail -f logs/server.log Kafka application log tail -f from /home/mon99usr/kafka/logs/server.log [ [2016-03-28 17:45:56,259] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:45:56,280] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:45:56,281] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(<host3-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-28 17:45:56,300] INFO [Kafka Server 3], started (kafka.server.KafkaServer) [2016-03-28 17:50:45,138] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [2016-03-28 17:50:57,090] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:50:57,096] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:50:57,097] INFO 3 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-03-28 17:51:00,733] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener) [2016-03-28 17:52:37,509] INFO KafkaConfig values: ... (kafka.server.KafkaConfig) [2016-03-28 17:52:37,675] INFO starting (kafka.server.KafkaServer) [2016-03-28 17:52:37,686] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer) [2016-03-28 17:52:38,083] INFO Loading logs. [2016-03-28 17:52:38,090] INFO Logs loading complete. [2016-03-28 17:52:38,091] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-03-28 17:52:38,103] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-03-28 17:52:38,163] INFO Awaiting socket connections on SBT-IPO-208.youre.adress.ru:9092. (kafka.network.Acceptor) [2016-03-28 17:52:38,169] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer) [2016-03-28 17:52:38,199] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:52:38,203] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:52:38,378] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:52:38,394] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 13 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-03-28 17:52:38,396] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:52:38,397] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator) [2016-03-28 17:52:38,407] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper) [2016-03-28 17:52:38,432] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:52:38,437] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper) [2016-03-28 17:52:38,445] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$) [2016-03-28 17:52:38,485] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:52:38,504] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-03-28 17:52:38,506] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -\> EndPoint(<host3-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-03-28 17:52:38,524] INFO [Kafka Server 3], started (kafka.server.KafkaServer)Запуск Corax-брокера на сервере
Server-3выполнен успешно.
Запуск кластеров Corax и ZooKeeper выполнен успешно.
Тест доступности ZooKeeper / работоспособности Corax при различных условиях работы кластера ZooKeeper
Тест работоспособности Corax при всех работающих узлах кластера ZooKeeper.
Вывод списка топиков:
bin/kafka-topics.sh --list --bootstrap-server <host1-ip-address>:<port>,<host2-ip-address>:<port>,<host3-ip-address>:<port> NUMBERSВ списке содержится топик
NUMBERS.Отправка сообщений в топик
NUMBERS:bin/kafka-console-producer.sh --bootstrap-server <host1-ip-address>:<port>,<host2-ip-address>:<port>,<host3-ip-address>:<port> --topic NUMBERS 1 2 3Запись в топик успешна.
Чтение сообщений из топика:
bin/kafka-console-consumer.sh --group test-group --bootstrap-server <host2-ip-address>:9092 --topic NUMBERS --from-beginning 1 2 3Чтение из топика успешно.
Тест работоспособности Corax при отказе одного узла кластера ZooKeeper:
список топиков выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Тест работоспособности Corax при отказе двух узлов кластера ZooKeeper:
список топиков не выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Тест работоспособности Corax при полном отказе кластера ZooKeeper:
список топиков не выводится;
чтение сообщений из топика выполняется успешно;
отправка сообщений в топик выполняется успешно.
Конфигурирование Corax#
Конфигурирование узла (брокера) Corax#
Конфигурирование брокера Corax осуществляется в файле
config/server.properties.
Допускается создание файлов конфигурации с различными именами, например:
config/server-1-node-cluster.properties
config/server-3-node-cluster.properties
config/server-3-node-cluster-on-1-machine.properties
config/server-5-node-cluster.properties
При запуске Corax достаточно указать нужный конфигурационный файл.
Минимальная конфигурация#
Пример конфигурационного файла server.properties для одного брокера:
broker.id=1
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs
Шаги создания конфигурационного файла:
Присвойте ключу
broker.idнеобходимый числовой идентификатор. Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.Пример
broker.id=1Для ключа
zookeeper.connectукажитеIP:PORTсервиса ZooKeeper.Пример
Запущен один экземпляр ZooKeeper и его порт
clientPort=2181:zookeeper.connect=localhost:2181Пример
Запущен кластер из нескольких экземпляров ZooKeeper (адреса всех служб ZooKeeper указаны через запятую):
zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYYДля ключа
log.dirsзадайте одну или несколько директорий, в которых планируется хранение сообщений, передаваемых в Corax.Пример
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logsПримечание
Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.
Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logsДля ключа
listenersприсвойте строку формата<ПРОТОКОЛ>://<хост>:<порт>:Пример
listeners=PLAINTEXT://localhost:9092 listeners=SSL://<host2-ip-address>:9092 listeners=SASL_PLAINTEXT://:9092
Расширенная конфигурация#
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Наиболее важные из них приведены далее.
Если на одном физическом сервере планируется запускать несколько брокеров Corax, то для каждого брокера необходимо также указать уникальный порт, заданный в ключе
port. По умолчанию сервер Corax слушает порт9092.Можно изменить значение количества партиций для вновь создаваемых топиков по умолчанию в ключе
num.partitions. Если для сервера Corax доступны несколько жестких дисков, увеличение числа партиций увеличит производительность Corax. Количество партиций по умолчанию можно указать равным числу дисков.Пример:
num.partitions=4Для повышения надежности необходимо раскомментировать записи, содержащие ключи
log.flush.interval.messagesиlog.flush.interval.ms. В этом случае указывается, как часто должна происходить реальная запись сообщений на диск: либо по истечении заданного времени, либо после достижения заданного количества незаписанных сообщений. Данную настройку можно изменить для конкретного топика при его создании.Пример:
log.flush.interval.messages=10000 log.flush.interval.ms=1000Для ключа
log.retention.hoursзадается время хранения партиций в часах, по истечении которого партиция будет доступна для удаления. В данном случае отсчет будет вестись по самому новому сообщению в партиции.Пример
log.retention.hours=168Для ключа
log.cleaner.enableзадается значениеtrueдля активации механизма удаления старых партиций.Пример
log.cleaner.enable=true
Запуск узлов кластера Corax#
Выполнив вышеуказанные настройки, можно запускать узлы кластера Corax.
Отдельные узлы можно запускать в любое время, т. е. запуск отдельного узла сервера Corax не зависит от остальных, как это было с кластером ZooKeeper.
Запуск каждого отдельного сервера Corax выполняется командой:
bin/kafka-server-start.sh config/server.properties
При запуске всех узлов кластера Corax в консолях запуска должны быть следующие записи:
INFO [Kafka Server ID], started (kafka.server.KafkaServer)
Где ID — идентификатор сервера Corax, указанный в ключе broker.id.
Сетевые настройки#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
message.max.bytes |
Максимальный размер сообщений, принимаемых сервером. Пример: message.max.bytes=1000012 |
int |
1000012 |
num.network.threads |
Количество сетевых потоков, используемых сервером для обработки сетевых запросов. Пример: num.network.threads=3 |
int |
3 |
queued.max.requests |
Лимит запросов. Если лимит превышен, начнется блокировка сетевых запросов. Пример: message.max.bytes=500 |
int |
500 |
replica.socket.receive.buffer.bytes |
Размер (в байтах) буфера сокета для приема сетевых запросов. Пример: replica.socket.receive.buffer.bytes=65536 |
int |
65536 |
replica.socket.timeout.ms |
Тайм-аут (в миллисекундах) сокета для сетевых запросов. Пример: replica.socket.timeout.ms=102400 |
int |
30000 |
request.timeout.ms |
Максимальное количество времени, которое уходит на ожидание клиентом ответа на отправленный запрос. Если в течение этого времени ответ на запрос не получен, клиент отправит повторный запрос. Если лимит повторных запросов превышен, то запросы будут прекращены. Пример: request.timeout.ms=102400 |
int |
30000 |
socket.receive.buffer.bytes |
Размер буфера в байтах на прием данных через сокет. Пример: socket.receive.buffer.bytes=102400 |
int |
102400 |
socket.request.max. bytes |
Максимальное количество байтов на сокет-запрос. Пример: socket.request.max.bytes=104857600 |
int |
104857600 |
socket.send.buffer.bytes |
Размер буфера в байтах на передачу данных через сокет. Пример: socket.send.buffer.bytes =102400 |
int |
102400 |
connections.max.idle.ms |
Тайм-аут неудачных попыток соединения в миллисекундах. Пример: connections.max.idle.ms=60000 |
long |
60000 |
controller.socket.timeout.ms |
Тайм-аут сокета в миллисекундах для Controller-to-Broker каналов. Пример: controller.socket.timeout.ms=30000 |
int |
30000 |
max.connections.per.ip |
Максимальное количество соединений с каждого IP-адреса. Пример: max.connections.per.ip=2147483647 |
int |
2147483647 |
max.connections.per.ip.overrides |
Настройка, позволяющая переопределять заданное максимальное количество соединений max.connections.per.ip, варьируя его для каждого IP-адреса. Пример: max.connections.per.ip.overrides=<host1-ip-address>:50,<host2-ip-address>:20,<host3-ip-address>:45 |
string |
«» |
Настройки подключения к ZooKeeper#
Запуск Corax невозможен без подключения к ZooKeeper.
Для подключения Corax-брокеров к кластеру ZooKeeper в файле конфигураций server.properties каждого Corax-брокера в свойство zookeeper.connect необходимо прописать список всех хостов ZooKeeper-брокеров в формате IP:PORT, разделяя их запятыми. Где:
IP— адрес хоста, на котором работает ZooKeeper-брокер;PORT— порт (clientPortв файлеzookeeper.properties) для подключения клиентов ZooKeeper — Corax-брокеров.
Для всех остальных настроек выставятся значения по умолчанию, если они не будут указаны явно в файле конфигураций. Список всех известных настроек ZooKeeper, относящихся к файлу конфигураций server.properties Corax-брокера, и их значений по умолчанию приведен в таблице ниже.
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
zookeeper.connect |
Список адресов ZooKeeper-брокеров в формате |
string |
«» |
zookeeper.connection.timeout.ms |
Максимальное время ожидания клиентом соединения с ZooKeeper. Если это значение не выставлено, то используется zookeeper.session.timeout.ms. Пример: zookeeper.connection.timeout.ms=6000 |
int |
null |
zookeeper.session.timeout.ms |
Тайм-аут сессии ZooKeeper в миллисекундах. Пример: zookeeper.session.timeout.ms=6000 |
int |
6000 |
zookeeper.set.acl |
Принудить клиента использовать безопасные ACL (Access Control Lists — списки контроля доступа). Пример: zookeeper.set.acl=false |
boolean |
false |
zookeeper.sync.time.ms |
Продолжительность времени в миллисекундах, через которое происходит синхронизация ZooKeeper-последователя с ZooKeeper-лидером. Пример: zookeeper.sync.time.ms=2000 |
int |
2000 |
Настройки диска#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
log.dir |
Директория, в которой хранятся данные Corax — логи сообщений. Это свойство является дополнением к свойству |
string |
/tmp/kafka-logs |
log.dirs |
Список директорий, в которых хранятся данные Corax — логи сообщений. Если не задано, используется значение из |
string |
null |
log.flush.interval.messages |
Интервал, выражаемый количеством сообщений, накапливающихся в логе партиции до момента сохранения этого лога на диск. Если в логе накопилось сообщений меньше, чем задано в этом свойстве, то в случае сбоя (например, перезапуска ОС) эти сообщения будут потеряны на данной машине, поскольку хранились в оперативной памяти и не были сохранены на диск. Пример: log.flush.interval.messages=10000 |
long |
9223372036854775807 |
log.flush.interval.ms |
Максимальное время (в миллисекундах) хранения сообщения любого топика в оперативной памяти до того, как оно будет сохранено на диск. Если свойство не задано, будет использоваться значение из другого свойства — log.flush.scheduler.interval.ms. Пример: log.flush.interval.ms=1000 |
long |
null |
log.flush.offset.checkpoint.interval.ms |
Частота, с которой происходит обновление персистентной записи, сохраненной в ходе последней сессии записи на диск. Эта частота служит отправной точкой восстановления лога. Пример: log.flush.offset.checkpoint.interval.ms=10 |
int |
60000 |
log.flush.scheduler.interval.ms |
Частота в миллисекундах, с которой данные каждого лога сохраняются на диск. Пример: log.flush.scheduler.interval.ms=10 |
long |
9223372036854775807 |
log.retention.bytes |
Максимальный размер лога перед его удалением. Пример: log.retention.bytes =100 |
long |
-1 |
log.retention.hours |
Время (в часах) хранения лог-файла до его удаления. Пример: log.retention.hours=168 |
int |
168 |
log.retention.minutes |
Время (в минутах) хранения лог-файла до его удаления. Если это свойство не задано, будет использоваться значение из log.retention.hours. Пример: log.retention.minutes=100000 |
int |
null |
log.retention.ms |
Время (в миллисекундах) хранения лог-файла до его удаления (третье по важности после свойства log.retention.minutes). Если это свойство не задано, будет использоваться значение из log.retention.minutes. Пример: log.retention.ms=6000 |
long |
null |
log.roll.hours |
Циркуляция логов (в часах) — максимальное количество времени до записи нового сегмента лога. Это свойство имеет более низкий приоритет, чем аналогичное свойство log.roll.ms. Пример: log.roll.hours=10 |
int |
168 |
log.roll.jitter.hours |
Максимальное случайное отклонение (для циркуляции логов) (в часах) для вычитания из значения logRollTimeMillis (в миллисекундах). Вторично по отношению к свойству log.roll.jitter.ms. Пример: log.roll.jitter.hours=1 |
int |
0 |
log.roll.ms |
Циркуляция логов (в миллисекундах) — максимальное количество времени до записи нового сегмента лога. Если свойство не задано, используется значение из log.roll.hours. Пример: log.roll.ms=10 |
long |
null |
log.segment.bytes |
Максимальный размер одного лог-файла. Пример: log.segment.bytes=250 |
int |
1073741824 |
log.segment.delete.delay.ms |
Количество времени (в миллисекундах) до удаления файла из файловой системы. Пример: log.roll.delete.delay.ms=10 |
long |
60000 |
log.cleaner.backoff.ms |
Количество времени на простаивание до появления логов для удаления. Пример: log.cleaner.backoff.ms=10 |
long |
15000 |
log.cleaner.dedupe.buffer.size |
Общий размер памяти, выделяемой для удаления дубликатов во всех потоках службы «Log Cleaner». Пример: log.cleaner.dedupe.buffer.size=10 |
long |
134217728 |
log.cleaner.delete.retention.ms |
Количество времени сохранения удаленных записей. Пример: log.cleaner.delete.retention.ms=10 |
long |
86400000 |
log.cleaner.enable |
Включение службы «Log Cleaner». Должно иметь значение true, если есть какой-либо топик со свойством cleanup.policy=compact, включая топик с внутренними смещениями. Если значение установлено в false, то эти топики не будут компактными и будут постоянно увеличиваться в размере. Пример: log.cleaner.enable=true |
boolean |
true |
log.cleaner.io.buffer.load.factor |
Коэффициент загрузки буфера при удалении дубликатов процессом «Log Cleaner». Например, при коэффициенте, равном 0.9, степень загрузки буфера — 90%. Чем больше значение, тем большее количество логов будет удаляться за один раз, однако вместе с этим будет возрастать и число хеш-коллизий. Пример: log.cleaner.io.buffer.load.factor=0.9 |
double |
0.9 |
log.cleaner.io.buffer.size |
Общий объем памяти, используемый службой «Log Cleaner» на операции ввода/вывода для всех потоков этой службы. Пример: log.cleaner.io.buffer.size=255 |
int |
524288 |
log.cleaner.io.max.bytes.per.second |
Ограничение операций ввода/вывода службы «Log Cleaner» так, чтобы сумма байтов при операциях ввода и вывода была всегда меньше значения, указанного в этом свойстве. Пример: log.cleaner.io.max.bytes.per.second=5.0 |
double |
1.7976931348623157E308 |
log.cleaner.min.cleanable.ratio |
Минимальное соотношение между «грязным» и общим логом для определения пригодности лога для удаления. Пример: log.cleaner.min.cleanable.ratio=0.5 |
double |
0.5 |
log.cleaner.threads |
Число фоновых потоков, используемых для чистки логов службой «Log Cleaner». Пример: log.cleaner.threads=1 |
int |
1 |
log.cleanup.policy |
Политика очистки сегментов логов, находящихся за пределами сохраняющего окна «retention window». Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: log.cleanup.policy=delete |
string |
delete |
log.index.interval.bytes |
Интервал (в байтах), через который прибавляется запись к индексу смещения (offset). Пример: log.index.interval.bytes=4096 |
int |
4096 |
log.index.interval.bytes |
Интервал (в байтах), через который прибавляется запись к индексу смещения (offset). Пример: log.index.interval.bytes=4096 |
int |
4096 |
log.index.size.max.bytes |
Максимальный размер индекса смещения (offset) в байтах. Пример: log.index.size.max.bytes=10485760 |
int |
10485760 |
log.preallocate |
Требуется ли создавать файл при выделении нового сегмента. Для Kafka на Windows рекомендуется установить значение true. Пример: log.preallocate=false |
boolean |
false |
log.retention.check.interval.ms |
Частота в миллисекундах, с которой служба «Log Cleaner» проверяет каждый лог на необходимость его удаления. Пример: log.retention.check.interval.ms=300000 |
long |
300000 |
Настройки топиков#
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
cleanup.policy |
Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: cleanup.policy=delete |
log.cleanup.policy |
delete |
delete.retention.ms |
log.cleaner.delete.retention.ms |
86400000 (24 hours) |
|
flush.messages |
log.flush.interval.messages |
None |
|
flush.ms |
log.flush.interval.messages |
None |
|
index.interval.bytes |
log.flush.interval.ms |
4096 |
|
max.message.bytes |
message.max.bytes |
1000000 |
|
min.cleanable.dirty.ratio |
log.cleaner.min.cleanable.ratio |
0.5 |
|
min.insync.replicas |
min.insync.replicas |
1 |
|
retention.bytes |
log.retention.bytes |
None |
|
retention.ms |
Максимальное время (в миллисекундах) хранения логов до того, как старые сегменты лога будут удалены для освобождения дискового пространства. Свойство работает только в том случае, если |
log.retention.ms |
(7 days) |
segment.bytes |
Размер лог-файла в байтах. Сохраняется и удаляется всегда по одному файлу за один раз |
log.segment.bytes |
1 ГБ |
segment.index.bytes |
Размер индекса, который отображает смещения (offset) на позиции в файлах |
log.index.size.max.bytes |
10 МБ |
segment.ms |
log.roll.hours |
7 days |
|
segment.jitter.ms |
log.roll.jitter.{ms,hours} |
0 |
Конфигурирование кластера Corax#
Конфигурирование кластера серверов Corax, как и конфигурирование одного узла, осуществляется в файле server.properties на каждом сервере, который планируется включить в кластер.
Минимальная конфигурация кластера#
Пример конфигурационного файла server.properties для кластера Corax из двух брокеров при установке по профилю no-auth (приведен пример конфигурации для одного из брокеров):
broker.id=1
listeners=PLAINTEXT://10.XX.XX.XX:YYYY
zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYY
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs
Шаги создания конфигурационного файла:
Присвойте ключу
broker.idнеобходимый числовой идентификатор.Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.
Пример
На первом сервере Corax:
broker.id=1На втором сервере:
broker.id=2Присвойте ключу
listenersIP-адрес сервера.Пример
listeners=PLAINTEXT://10.XX.XX.XX:YYYYЕсли на одном физическом сервере планируется запускать несколько серверов Corax, то для каждого сервера необходимо также указать уникальный порт, заданный в ключе
listeners. По умолчанию сервер Corax слушает порт 9092.Для ключа
zookeeper.connectукажите IP и PORT сервиса ZooKeeper в форматеzookeeper.connect=IP:PORT. Если кластер ZooKeeper запущен, то укажите адреса ZooKeeper-сервисов через запятую:Пример
zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYYДля ключа
log.dirsзадайте директорию, в которой планируется хранение сообщений, передаваемых в Corax.Пример
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logsПримечание
Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.
Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs
На данном этапе минимальная конфигурация узлов кластера Corax завершается. Если не существует каких-либо ошибок в конфигурации, а также ZooKeeper сконфигурирован корректно и запущен, то этого вполне достаточно для успешного запуска и работы кластера Corax.
Расширенная конфигурация кластера#
Конфигурирование производителя (producer)#
Минимальная конфигурация
Пример конфигурационного файла producer.properties для одного брокера (в примере в качестве хоста производителя указан текущий хост — hostname -f):
acks=1
bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZ
Шаги создания конфигурационного файла:
Присвойте ключу
acksзначение, указывающее, что сообщение отправлено:0 — сообщение расположено в TCP-буфере для отправки по сети;
1 — сообщение успешно записано в партицию-лидер;
1 (all) — сообщение успешно записано на все
ISR(InSyncReplicasпараметр на Kafka broker(server.properties))
Пример
acks=1Для ключа
bootstrap.serversукажитеIP:PORTсервиса Corax.Пример
Если запущены 3 экземпляра Corax на том же сервере:
bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZДопускается также указывать IP-адреса, полные доменные имена.
Расширенная конфигурация
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.
Запуск каждого отдельного производителя Corax выполняется командой:
./bin/kafka-console-producer --bootstrap-server `hostname -f`:9101, `hostname -f`:9102, `hostname -f`:9103 --topic test1 --producer.config etc/kafka/producer.properties
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
key.serializer |
Класс сериализатора для ключа, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
value.serializer |
Класс сериализатора для значения, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
acks |
Количество подтверждений, которые производитель должен получить от лидера, прежде чем считать запрос завершенным. Это определяет долговечность записей, которые будут отправлены. Разрешены следующие настройки: |
string |
all |
bootstrap.servers |
Список пар «хост: порт», используемых для установления начального соединения с кластером Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде: host1:port1, host2:port2, … Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (можно указать более одного, на случай, если указанный сервер не работает) |
list |
«» |
buffer.memory |
Общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, производитель блокируется на время, заданное параметром max.block.ms, после чего он выдаст исключение. Этот параметр должен примерно соответствовать общей памяти, которую будет использовать производитель, но не является жесткой привязкой, поскольку не вся память, используемая производителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержания inflight-запросов |
long |
33554432 |
compression.type |
Тип сжатия для всех данных, генерируемых производителем. Допустимые значения: none (без сжатия), gzip, snappy, lz4 или zstd. Сжатие полных партий данных, поэтому эффективность разбивки на порции данных также повлияет на степень сжатия (большее дозирование означает лучшее сжатие) |
string |
none |
retries |
Установка значения больше нуля будет означать для клиента переотправку любой записи, отправка которой завершилась предположительно временной ошибкой. Эта попытка не отличается от случая, когда клиент получает ошибку. Разрешение повторов (retries) без установки параметра max.in.flight.requests.per.connection=1 потенциально изменит порядок записей, т. к. если два пакета отправляются в один раздел и первый пакет вызывает сбой и повторяется, но второй успешно записан, то запись о втором пакете может появиться первой. Запросы на запись будут завершены ошибкой до того, как количество попыток будет исчерпано, а если тайм-аут настроен, delivery.timeout.msexpires будет достигнут до успешного подтверждения. Пользователи обычно предпочитают не указывать этот параметр, а использовать delivery.timeout.msexpires для управления поведением повторной попытки |
int |
2,15E+09 |
ssl.key.password |
Пароль закрытого ключа в файле keystore. Это необязательно для клиента |
password |
null |
ssl.keystore.location |
Расположение закрытого ключа в файле keystore. Это необязательно для клиента |
string |
null |
ssl.keystore.password |
Пароль хранилища для файла хранилища ключей (keystore). Это необязательно для клиента и требуется, если используется параметр ssl.keystore.location |
password |
null |
ssl.truststore.location |
Расположение файла хранилища доверительных сертификатов (truststore) |
string |
null |
ssl.truststore.password |
Пароль для файла хранилища доверительных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, но проверка целостности отключена |
password |
null |
batch.size |
При отправке нескольких записей в один раздел производитель попытается объединить записи в меньшее количество запросов. Это повышает производительность как на клиенте, так и на сервере. Этот параметр управляет размером пакета по умолчанию в байтах. Не будет предпринята попытка объединения записей в пакеты большего размера. Запросы, отправленные брокерам, будут содержать несколько пакетов, по одному для каждой партиции с доступными для отправки данными. Небольшой размер пакета сделает дозирование менее распространенным и может уменьшить пропускную способность (нулевой размер пакета полностью отключит дозирование). Очень большой размер пакета может использовать память немного более расточительно, так как всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей |
int |
16384 |
client.dns.lookup |
Параметр управляет тем, как клиент использует DNS lookup. Если установлено значение use\all\dns\ips, то, когда поиск возвращает несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверам. Если установлено значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена и расширена в список канонических имен |
string |
default |
client.id |
Строка id для передачи серверу при выполнении запросов. Позволяет отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера |
string |
«» |
connections.max.idle.ms |
Закрытие неактивных соединений после количества миллисекунд, указанных для этого параметра |
long |
540000 |
delivery.timeout.ms |
Верхняя граница времени для отчета об успехе или сбое после получения ответа на вызов send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (acks) (если ожидается) и время, разрешенное для повторных отправок при сбоях. Producer может сообщить об ошибке отправки записи раньше, чем это обозначено в конфигурации, если обнаружена неустранимая ошибка, количество повторных попыток отправки было исчерпано или запись добавлена в пакет, который достиг окончания срока доставки (deadline) раньше. Значение этой конфигурации должно быть больше или равно сумме request.timeout.ms + linger.ms |
int |
120000 |
linger.ms |
Производитель группирует все записи, поступающие между запросами передачи, в один пакетированный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако в некоторых случаях клиенту может понадобиться уменьшить количество запросов даже при умеренной нагрузке. Рассматриваемая настройка позволяет это сделать, добавляя небольшое количество искусственной задержки, то есть вместо немедленной отправки записи производитель будет ждать до заданной задержки, чтобы разрешить отправку других записей и отправить их вместе. Это можно рассматривать как аналог алгоритма Нейгла в TCP. Параметр определяет верхнюю границу задержки для дозирования: как только получен batch.size записей для партиции, они будут отправлены немедленно независимо от этого параметра, однако если количество байтов меньше, чем накопленных для этого раздела, будет выполняться задержка в течение указанного времени, ожидая появления большего количества записей. Этот параметр по умолчанию равен 0 (т. е. без задержки). Установка linger.ms=5, например, будет иметь эффект уменьшения количества отправленных запросов, но добавит до 5 мс задержки к записям, отправленным в отсутствие нагрузки |
int |
0 |
max.block.ms |
Параметр определяет, как долго KafkaProducer.send() и KafkaProducer.partitionsFor() будут блокироваться. Эти методы могут быть заблокированы, так как буфер заполнен или метаданные недоступны. Блокировка в пользовательских сериализаторах или разделителе не будет учитываться в течение этого времени ожидания |
long |
60000 |
max.request.size |
Максимальный размер запроса в байтах. Этот параметр ограничивает количество партий записей, которые производитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. Это также эффективно ограничивает максимальный размер пакета записи. Обратите внимание, что сервер имеет свой собственный размер пакета записи, который может отличаться от заданного в данном параметре |
int |
1048576 |
partitioner.class |
Класс разделителя, реализующий org.apache.kafka.clients.producer.Partitioner-интерфейс |
class |
org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes |
Размер буфера приема TCP (SO_RCVBUF) для использования при чтении данных. Если значение равно -1, по умолчанию будет использоваться размер буфера, установленный в ОС |
int |
32768 (32 кБ) |
request.timeout.ms |
Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны. Значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера) для уменьшения возможности дублирования сообщений из-за ненужных попыток производителя |
int |
30000 |
sasl.client.callback.handler.class |
Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler |
class |
null |
sasl.jaas.config |
Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: |
password |
null |
sasl.kerberos.service.name |
Имя участника (принципала) Kerberos, под которым работает Kafka. Это можно определить либо в конфигурации JAAS Kafka, либо в конфигурации Kafka |
string |
null |
sasl.login.callback.handler.class |
Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс AuthenticateCallbackHandler. Для брокеров конфигурация обработчика обратного вызова входа (login callback handler) должна иметь префикс прослушивателя (listener) и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class |
null |
sasl.login.class |
Полное имя класса, который определяет интерфейс входа. Для брокеров конфигурация входа должна быть с префиксом listener’а и именем механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin |
class |
null |
sasl.mechanism |
Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого доступен поставщик безопасности. GSSAPI является механизмом по умолчанию |
string |
GSSAPI |
security.protocol |
Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL |
string |
PLAINTEXT |
send.buffer.bytes |
Размер буфера отправки TCP (SO_SNDBUF) для использования при отправке данных. Если значение равно -1, будет использоваться ОС по умолчанию |
int |
131072 |
ssl.enabled.protocols |
Список протоколов, разрешенных для SSL-соединений |
list |
TLSv1.3,TLSv1.2 |
ssl.keystore.type |
Формат файла хранилища ключей (key store). Параметр необязателен для клиента |
string |
JKS |
ssl.protocol |
Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимые значения: TLSv1.2 и TLSv1.3. SSL, SSLv2 и SSLv3 могут поддерживаться в старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности |
string |
TLS |
ssl.provider |
Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию — значение, установленное по умолчанию в JVM |
string |
null |
ssl.truststore.type |
Формат файла хранилища доверия (trust store) |
string |
JKS |
enable.idempotence |
Если задано значение true, производитель гарантирует, что в поток записывается ровно одна копия каждого сообщения. Если false, производитель повторяет попытку из-за сбоев брокера и т. д., может писать дубликаты повторенного сообщения в потоке. Обратите внимание, что для включения идемпотенции требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество попыток переотправки (retries) было больше 0, а ack=all. Если эти значения явно не заданы пользователем, будут выбраны подходящие значения. Если установлены несовместимые значения, будет создано исключение ConfigException |
boolean |
false |
interceptor.classes |
Список классов для использования в качестве перехватчиков. Реализовывает org.apache.kafka.clients.producer.ProducerInterceptor-интерфейс, позволяющий перехватывать (и, возможно, изменять) записи, полученные производителем до их публикации в кластере Kafka. По умолчанию перехватчиков нет |
list |
«» |
max.in.flight.requests.per.connection |
Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если этот параметр больше 1 и есть неудачные отправки, существует риск переупорядочения сообщений из-за повторных попыток (т. е. если повторные попытки включены) |
int |
5 |
metadata.max.age.ms |
Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений в руководстве разделов, чтобы проактивно обнаружить новые брокеры или разделы |
long |
300000 |
metric.reporters |
Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporter интерфейса позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX |
list |
«» |
metrics.num.samples |
Количество выборок, сохраняемых для вычисления метрик |
int |
2 |
metrics.recording.level |
Самый высокий уровень записи для метрик |
string |
INFO |
metrics.sample.window.ms |
Окно времени, в котором вычисляется выборка метрик |
long |
30000 |
reconnect.backoff.max.ms |
Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного джиттера; это позволяет избежать избыточного количества подключений, которое ведет к деградации производительности брокера |
long |
1000 |
reconnect.backoff.ms |
Базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру |
long |
50 |
retry.backoff.ms |
Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя |
long |
100 |
sasl.kerberos.kinit.cmd |
Путь к команде Kerberos kinit |
string |
/usr/bin/kinit |
sasl.kerberos.min.time.before.relogin |
Время сна потока входа между попытками обновления |
long |
60000 |
sasl.kerberos.ticket.renew.jitter |
Процент случайного jitter, добавленного к времени обновления |
double |
0.05 |
sasl.kerberos.ticket.renew.window.factor |
Определяет окно времени в процентах от срока действия билета безопасности Kerberos. Когда остаток времени до истечения срока действия билета достигает определенного процента, заданного данным параметром, сеансы аутентификации могут попытаться обновить билет Kerberos |
double |
0.8 |
sasl.login.refresh.buffer.seconds |
Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
300 |
sasl.login.refresh.min.period.seconds |
Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
60 |
sasl.login.refresh.window.factor |
Поток обновления входа будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%). В настоящее время применяется только к носителю OAUTHBEARER |
double |
0.8 |
sasl.login.refresh.window.jitter |
Максимальное количество случайного джиттера относительно времени жизни учетных данных, добавленного ко времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно. Значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER |
double |
0.05 |
ssl.cipher.suites |
Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров |
list |
null |
ssl.endpoint.identification.algorithm |
Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера |
string |
https |
ssl.keymanager.algorithm |
Алгоритм, используемый фабрикой key manager для SSL-соединений. Значение по умолчанию — это алгоритм фабрики Key manager, настроенный для виртуальной машины Java |
string |
SunX509 |
ssl.secure.random.implementation |
Реализация SecureRandom PRNG, используемая для операций шифрования SSL |
string |
null |
ssl.trustmanager.algorithm |
Алгоритм, используемый фабрикой trust manager для SSL-соединений. Значение по умолчанию — это заводской алгоритм trust manager, настроенный для виртуальной машины Java |
string |
PKIX |
transaction.timeout.ms |
Максимальное время в миллисекундах, в течение которого координатор транзакций будет ждать обновления статуса транзакции от производителя, прежде чем проактивно прервать текущую транзакцию. Если это значение больше, чем transaction.max.timeout.ms в брокере, запрос завершится ошибкой InvalidTransactionTimeout |
int |
60000 |
transactional.id |
Идентификатор транзакции, используемый для доставки транзакций. Это включает семантику надежности, которая охватывает несколько сеансов производителя, так как это позволяет клиенту гарантировать, что транзакции с использованием того же TransactionalId были завершены до начала любых новых транзакций. Если идентификатор транзакции не указан, производитель ограничивается идемпотентной доставкой. Обратите внимание, что enable.idempotencemust должна быть включена, если настроен идентификатор транзакции. Значение по умолчанию равно null, что означает невозможность использования транзакций. По умолчанию для транзакций требуется кластер по крайней мере из трех брокеров, что является рекомендуемым параметром для производства; для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor |
string |
null |
Конфигурирование потребителя (consumer)#
Минимальная конфигурация#
Пример конфигурационного файла consumer.properties для одного брокера (в примере в качестве хоста потребителя указан текущий хост — hostname -f):
group.id=test-consumer-group
bootstrap.servers=`hostname -f`:9101, `hostname -f`:9102,
`hostname -f`:9103
Шаги создания конфигурационного файла:
Присвойте ключу
group.idнеобходимый идентификатор группы, под которым будет запущен consumer. Можно запускать несколько consumers под одним и тем жеgroup.id, что позволит считывать данные в несколько потоков (с нескольких серверов), обеспечивая отсутствие дублирования вычитываемых данных.Пример
group.id=test-consumer-groupДля ключа
bootstrap.serversукажитеIP:PORTсервиса Corax.Пример
Если запущены 3 экземпляра Corax на том же сервере:
bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZДопускается также указывать IP-адреса, полные доменные имена.
Расширенная конфигурация#
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.
Запуск каждого отдельного потребителя Corax выполняется командой:
./bin/kafka-console-consumer --bootstrap-server `hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZ --topic test1 --consumer.config etc/kafka/consumer.properties --from-beginning
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
key.deserializer |
Класс десериализатора для ключа, реализующего org.apache.kafka.common.serialization.Deserializerinterface |
class |
|
value.deserializer |
Класс десериализатора для значения, реализующего org.apache.kafka.common.serializationDeserializerinterface |
class |
|
bootstrap.servers |
Список пар «хост: порт», используемых для установления начального соединения с кластером Platform V Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде host1: port1, host2:port2,… Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (разрешено указать более одного, на случай, если указанный сервер не работает) |
list |
«» |
fetch.min.bytes |
Минимальный объем данных, который сервер должен вернуть для запроса на выборку (fetch request). Если данных недостаточно, брокер будет ждать, пока накопится необходимое количество данных, прежде чем ответить на запрос. Значение по умолчанию 1 байт означает, что запросы на выборку отвечают, как только доступен один байт данных или время ожидания запроса на выборку данных заканчивается. Установка этого значения больше 1 заставит сервер ждать накопления больших объемов данных, что может немного повысить пропускную способность сервера за счет некоторой дополнительной задержки |
int |
1 |
group.id |
Уникальная строка, идентифицирующая группу потребителей, к которой принадлежит данный потребитель. Это свойство является обязательным, если потребитель использует функции управления группами с помощью subscribe (topic) или стратегии управления смещением на основе Corax |
string |
null |
heartbeat.interval.ms |
Ожидаемое время между heartbeats (передачей сигнала «я жив») координатору потребителя при использовании средств управления группы Kafka. Передача сигнала используется для обеспечения активного сеанса потребителя и облегчения перебалансировки, когда новые потребители присоединяются или покидают группу. Значение должно быть установлено ниже session.timeout.ms, однако обычно должно устанавливается не выше 1/3 от session.timeout.ms. Его можно уменьшать с целью большего контроля предполагаемого времени для нормальных перебалансировок |
int |
3000 |
max.partition.fetch.bytes |
Максимальный объем данных на партицию, который будет возвращен сервером. Записи извлекаются потребителем пакетами (batch). Если первый пакет записей в первом непустом разделе выборки больше этого предела, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может добиться успеха по извлечению данных. Максимальный размер пакета записей, принятый брокером, определяется посредством |
int |
1048576 |
session.timeout.ms |
Тайм-аут, используемый для обнаружения сбоев потребителей при использовании средства управления группами Corax. Потребитель посылает периодические heartbeats, чтобы сообщить брокеру, что он жив (не завис). Если до истечения этого тайм-аута брокер не получит heartbeat, то брокер удалит этого потребителя из группы и инициирует перебалансировку. Значение должно находиться в допустимом диапазоне, настроенном в конфигурации брокера group.min.session.timeout.ms и group.max.session.timeout.ms |
int |
10000 |
ssl.key.password |
Пароль закрытого ключа в файле keystore. Необязательный параметр для клиента |
password |
null |
ssl.keystore.location |
Расположение файла хранилища ключей keystore. Необязательный параметр для клиента и может использоваться для двусторонней проверки подлинности (authentication) |
string |
null |
ssl.keystore.password |
Пароль хранилища для файла хранилища ключей keystore. Необязательный параметр для клиента и требуется только в случае, если объявлено ssl.keystore.location |
password |
null |
ssl.truststore.location |
Расположение файла хранилища доверенных сертификатов (truststore) |
string |
null |
ssl.truststore.password |
Пароль для файла хранилища доверенных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, однако проверка целостности отключена |
password |
null |
auto.offset.reset |
Определяет действие, выполняемое, если в Corax нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены): |
string |
latest |
client.dns.lookup |
Управляет тем, как клиент использует DNS-запросы. |
string |
default |
connections.max.idle.ms |
Закрытие незанятых соединений после указанного количества миллисекунд |
long |
540000 |
default.api.timeout.ms |
Задает время ожидания (в миллисекундах) для API-интерфейсов потребителя, которые могут блокировать. Эта конфигурация используется в качестве тайм-аута по умолчанию для всех операций потребителя, которые явно не принимают timeoutparameter |
int |
60000 |
enable.auto.commit |
Если установлено значение true, смещение потребителя будет периодически фиксироваться в фоновом режиме |
boolean |
true |
exclude.internal.topics |
Следует ли предоставлять потребителю записи из внутренних (системных) топиков (например, топик смещений). Если установлено значение true, единственным способом получения записей из внутренних (системных) топиков является подписка на них |
boolean |
true |
fetch.max.bytes |
Максимальный объем данных, который сервер должен вернуть в ответ на запрос по выборке. Записи извлекаются потребителем пакетами, и если первый пакет записей в первом непустом разделе выборки больше этого значения, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель работоспособен и может извлекать данные (есть все разрешения и д.р.). Таким образом, это не абсолютный максимум. Максимальный размер пакета записей, принятый брокером, определяется посредством message.max.bytes (настройки брокеров) или max.message.bytes (настройки топиков). Обратите внимание, что потребитель выполняет несколько выборок параллельно |
int |
52428800 |
isolation.level |
Управляет чтением сообщений, написанных транзакционно. Если установлено значение |
string |
read\uncommitted |
max.poll.interval.ms |
Максимальная задержка между вызовами poll () при использовании управления группами потребителей. Это накладывает ограничение на количество времени, которое потребитель может простаивать до получения других записей. Если poll () не вызывается до истечения этого тайм-аута, то потребитель считается неработоспособным и группа будет перебалансироваться, чтобы переназначить разделы другому члену |
int |
300000 |
max.poll.records |
Максимальное |
int |
500 |
partition.assignment.strategy |
Имя класса стратегии назначения разделов, которую клиент будет использовать для распределения партиций между экземплярами-потребителями при использовании управления группами |
list |
class org.apache.kafka.clients.consumer.RangeAssignor |
receive.buffer.bytes |
Размер буфера приема TCP (SO\RCVBUF), используемый при чтении данных. Если значение равно -1, по умолчанию будет использоваться размер буфера, установленный в ОС |
int |
65536 (64 кБ) |
request.timeout.ms |
Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны |
int |
30000 |
sasl.client.callback.handler.class |
Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler |
class |
null |
sasl.jaas.config |
Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: „loginModuleClass controlFlag (optionName=optionValue)*;“. Для брокеров конфигурация должна иметь префикс слушателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required |
password |
null |
sasl.kerberos.service.name |
Имя принципала Kerberos, под которым работает Rfarf. Это можно определить либо в конфигурации JAAS Corax, либо в конфигурации Kafka |
string |
null |
sasl.login.callback.handler.class |
Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс обработчика AuthenticateCallbackHandler. Для брокеров конфигурация logincallbackhandler должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class |
null |
sasl.login.class |
Полное имя класса, реализующего интерфейс входа в систему. Для брокеров конфигурация входа должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin |
class |
null |
sasl.mechanism |
Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого поставщик безопасности доступен. GSSAPI является механизмом по умолчанию |
string |
GSSAPI |
security.protocol |
Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL |
string |
PLAINTEXT |
send.buffer.bytes |
Размер буфера отправки TCP (SO\SNDBUF), используемый при отправке данных. Если значение равно -1, по умолчанию будет использоваться параметр, установленный в ОС |
int |
131072 |
ssl.enabled.protocols |
Список протоколов, разрешенных для SSL-соединений |
list |
TLSv1.3,TLSv1.2 |
ssl.keystore.type |
Формат файла хранилища ключей. Параметр необязателен для клиента |
string |
JKS |
ssl.protocol |
Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимыми значениями в современных JVM являются TLSv1.2 и TLSv1.3. SSL, SSLv2 и SSLv3 могут поддерживаться в более старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности |
string |
TLS |
ssl.provider |
Имя поставщика безопасности, используемого для SSL-соединений. Значением по умолчанию является поставщик безопасности по умолчанию для JVM |
string |
null |
ssl.truststore.type |
Формат файла хранилища доверительных сертификатов (truststore) |
string |
JKS |
auto.commit.interval.ms |
Частота в миллисекундах, с которой потребитель смещает offsets автоматически в Corax, если enable.auto.commit=true |
int |
5000 |
check.crcs |
Автоматически проверять CRC32 потребляемых (вычитываемых) записей. Это гарантирует отсутствие повреждения сообщений в сети или на диске. Эта проверка добавляет некоторые накладные расходы, поэтому она может быть отключена в случаях, требующих экстремальной производительности |
boolean |
true |
client.id |
Строка id для передачи серверу при выполнении запросов. Цель этого состоит в том, чтобы иметь возможность отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера |
string |
«» |
fetch.max.wait.ms |
Максимальное количество времени, на которое сервер заблокируется перед ответом на запрос fetch, если данных недостаточно для немедленного удовлетворения требования fetch.min.bytes |
int |
500 |
interceptor.classes |
Список классов для использования в качестве перехватчиков. Реализация интерфейса org.apache.kafka.clients.ConsumerInterceptor позволяет перехватывать (и, возможно, изменять) записи, полученные потребителем. По умолчанию перехватчиков нет |
list |
«» |
metadata.max.age.ms |
Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений лидеров партиций, чтобы проактивно обнаружить новые брокеры или партиции |
long |
300000 |
metric.reporters |
Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporterinterface позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX |
list |
«» |
metrics.num.samples |
Количество выборок, сохраняемых для вычисления метрик |
int |
2 |
metrics.recording.level |
Самый высокий уровень записи для метрик |
string |
INFO |
metrics.sample.window.ms |
Окно времени, в котором вычисляется выборка метрик |
long |
30000 |
reconnect.backoff.max.ms |
Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, к которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного джиттера; это позволяет избежать избыточного количества подключений, которое ведет к деградации производительности брокера |
long |
1000 |
reconnect.backoff.ms |
Базовое время ожидания перед попыткой повторного подключения к заданному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру |
long |
50 |
retry.backoff.ms |
Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя |
long |
100 |
sasl.kerberos.kinit.cmd |
Путь к команде Kerberos kinit |
string |
/usr/bin/kinit |
sasl.kerberos.min.time.before.relogin |
Время сна потока входа между попытками обновления |
long |
60000 |
sasl.kerberos.ticket.renew.jitter |
Процент случайного джиттера, добавленного ко времени обновления |
double |
0.05 |
sasl.kerberos.ticket.renew.window.factor |
Определяет окно времени в процентах от срока действия билета безопасности Kerberos. Когда остаток времени до истечения срока действия билета достигает определенного процента, заданного данным параметром, сеансы аутентификации могут попытаться обновить билет Kerberos |
double |
0.8 |
sasl.login.refresh.buffer.seconds |
Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); если значение не указано, используется значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER |
short |
300 |
sasl.login.refresh.min.period.seconds |
Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию — 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER |
short |
60 |
sasl.login.refresh.window.factor |
Поток обновления входа (login) будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%) |
double |
0.8 |
sasl.login.refresh.window.jitter |
Максимальное количество случайного джиттера относительно времени жизни учетных данных, добавленного к времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно; значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER |
double |
0.05 |
ssl.cipher.suites |
Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров |
list |
null |
ssl.endpoint.identification.algorithm |
Алгоритм идентификации конечной точки(endpoint) для проверки имени хоста сервера с помощью сертификата сервера |
string |
https |
ssl.keymanager.algorithm |
Алгоритм, используемый фабрикой менеджера ключей для SSL-соединений. Значение по умолчанию — это алгоритм key manager factory, настроенный для виртуальной машины Java |
string |
SunX509 |
ssl.secure.random.implementation |
Реализация SecureRandom PRNG, используемая для операций шифрования SSL |
string |
null |
Обновление SSL сертификатов на кластере Corax без недоступности кластера Corax#
Алгоритм обновления SSL сертификатов на брокерах кластера Corax:
Остановите брокер, который еще не обновлялся.
Для остановленного брокера обновите SSL сертификат.
Запустите брокер, и дождитесь завершения полной репликации (пример команды:
kafka-topics --zookeeper host:2181 --describe --under-replicated-partitions).Повторите шаг 1 для следующего брокера. Если это был последний брокер — обновление завершено.
Конфигурирование аудита событий над брокером в log-файл#
Для включения аудита над брокером Corax в log-файл, добавьте в конфигурационный файл
KAFKA_DIR/config/server.propertiesстроки:kafka.se.audit.enable=true kafka.se.audit.provider=kafka.audit.provider.LogAuditProviderгде
kafka.se.audit.enable— глобальное включение аудита;Примечание
По умолчанию будет включен аудит записи действий на брокере в log-файл
kafka-audit.logна уровне INFO.При необходимости указать другой log-файл для аудита и/или задать свои параметры (уровень логирования, вид записи, форматирование или режим записи rolling update), укажите appender в файле
config/log4j.properties:log4j.appender.auditFileAppender = org.apache.log4j.RollingFileAppender //включение режима записи rolling update log4j.appender.auditFileAppender.File = ${kafka.logs.dir}/kafka-audit.log //путь к log-файлу log4j.appender.auditFileAppender.layout.ConversionPattern = [%d] %p %m (%c)%n //формат даты и времени log4j.logger.kafka.audit.provider.LogAuditProvider = INFO, auditFileAppender //уровень логирования
Конфигурирование Schema Registry#
Конфигурирование Schema Registry описано в разделе «Конфигурация Schema Registry» документа «Schema Registry».