Руководство по системному администрированию#

Термины и определения#

Термин/аббревиатура

Определение

ACL

Access Control List, список управления доступом

API

Application Programming Interface, программный интерфейс приложения

CRC32

Cyclic Redundancy Check, циклический избыточный код — 32-битный алгоритм нахождения контрольной суммы, предназначенный для проверки целостности данных

DNS

Domain Name System, служба доменных имен

GSSAPI

Generic Security Services API, общий программный интерфейс сервисов безопасности

ID

Identifier, идентификатор

IP-адрес

Internet Protocol Address, уникальный числовой идентификатор устройства в компьютерной сети

JAAS

Java Authentication and Authorization Service, сервис аутентификации и авторизации Java

JKS

Java Keystore, хранилище сертификатов открытых ключей и авторизации, которое часто используется приложениями на основе Java для шифрования, аутентификации и установки соединений HTTPS

JMX

Java Management Extensions, управленческие расширения Java

JVM

Java Virtual Machine, виртуальная машина Java

LSO

Last Stable Offset, последнее стабильное смещение

PKIX

Public Key Infrastructure for X.509 Certificates, инфраструктура открытого ключа для сертификата X.509

SASL

Simple Authentication and Security Layer, простой уровень аутентификации и безопасности

SSL

Secure Sockets Layer, криптографический протокол, обеспечивающий защищенную передачу данных в компьютерной сети

TCP

Transmission Control Protocol, протокол управления передачей

TLS

Transport Layer Security, протокол защиты транспортного уровня

ОС

Операционная система

Сценарии администрирования#

Конфигурирование Zookeeper#

Конфигурирование Брокера Zookeeper осуществляется в файле config/zookeeper.properties.

Допускается создание файлов конфигурации с различными именами, например:

config/zookeeper-1-node-cluster.properties
config/zookeeper-3-node-cluster.properties
config/zookeeper-3-node-cluster-on-1-machine.properties
config/zookeeper-5-node-cluster.properties

При запуске Zookeeper достаточно указать нужный конфигурационный файл.

Конфигурирование узла Zookeeper#

В данном подразделе описывается конфигурирование Zookeeper для запуска в качестве standalone-приложения на одной машине. Этот вид конфигурации не обеспечивает отказоустойчивости и не рекомендован для использования в промышленной эксплуатации.

Конфигурирование узла Apache Zookeeper осуществляется в файле zookeeper.properties.

Минимальная конфигурация#

Пример файла конфигураций zookeeper.properties для одного Брокера:

dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000

Шаги создания конфигурационного файла:

  1. Для переменной dataDir укажите путь к директории, в которую Zookeeper будет сохранять файлы с данными, необходимыми ему для работы.

    Пример

    dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-data

  2. Для переменной clientPort укажите номер порта, к которому будут подключаться клиенты.

    Пример

    clientPort=2181

  3. Для переменной tickTime укажите единицу измерения времени Zookeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» Zookeeper. tickTime неявно используется в настройках для различных временных параметров Zookeeper. Например, для временной настройки initLimit=10 — это означает, что initLimit будет выполняться за 10 * tickTime миллисекунд.

    Пример

    tickTime=2000

Расширенная конфигурация#

Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:

  • maxClientCnxns — максимально допустимое количество соединений (на уровне сокетов), устанавливаемых клиентом (IP-адрес) с одним из серверов кластера Zookeeper.

    Если значение равно нулю или настройка не указана в файле конфигураций, то количество одновременных подключений считается неограниченным.

    Данная настройка позволяет предотвратить некоторые виды DoS-атак.

    Пример

    maxClientCnxns=0

  • dataLogDir — позволяет выделить отдельную директорию для хранения транзакционного лога Zookeeper. По умолчанию все транзакционные логи Zookeeper (файлы log.) хранятся вместе c файлами snapshot., т. н. «снепшотами», в одной общей директории dataDir.

    Свойство dataLogDir может быть использовано для сохранения транзакционных логов в памяти отдельного устройства, т. е. в памяти другого физического, а не логического диска, для достижения наилучшей производительности (за счет уменьшения нагрузки на диск).

    Пример

    dataLogDir=/another/path/to/zookeeper-logs

    Данные Zookeeper — это персистентные (неизменяемые) копии Zookeeper-узлов (znodes), хранимые соответствующим ансамблем на диске в виде файлов двух видов:

    • Транзакционные логи (transaction log) — это файлы с префиксом log., в которые в последовательном порядке записываются все изменения, происходящие с Zookeeper-узлами.

    • Снепшоты (snapshot) — это файлы с префиксом snapshot. Когда файл транзакционного лога достигает определенного размера, создается копия текущего состояния всех Zookeeper-узлов в виде снепшота. Этот снепшот заменяет все предыдущие логи.

    Примечание

    Устаревшие версии файлов транзакционных логов и снэпшотов не удаляются с диска сервером Zookeeper — эта задача полностью ложится на администратора Zookeeper.

  • globalOutstandingLimit — ограничение времени, позволяющее регулировать количество обращений клиентов к серверу Zookeeper. Поскольку клиенты отправляют запросы быстрее, чем сервер успевает их обработать, сервер может не справиться и израсходовать всю доступную память. По умолчанию число запросов, находящихся в обработке в некоторый момент времени, не превышает 1000.

    Пример

    globalOutstandingLimit=1000

  • preAllocSize — размер блока лог-файлов транзакций. По умолчанию равен 64 Мбайт. Использование транзакционного лога минимизирует количество обращений к диску. Если снэпшоты Zookeeper создаются слишком часто, размер логов транзакций может не достигать 64 Мбайт. В таких ситуациях включение этого параметра позволяет оптимизировать расход объема дискового пространства.

  • snapCount — число транзакций, которые будут записаны в лог транзакций после создания очередного «снапшота» («snapshot») — файла данных Zookeeper. По умолчанию — 10000.

    Пример

    snapCount =10000

  • traceFile — файл трассировки в формате traceFile.year.month.day, в который будут записываться все запросы клиентов. Полезен в целях отладки. Побочный эффект — некоторый спад производительности.

Текущая используемая конфигурация#

Файл конфигураций zookeeper.properties для Брокера следующий:

dataDir=/tmp/kafka/zookeeper
clientPort=2185
maxClientCnxns=0

Конфигурирование кластера Zookeeper#

Подробное описание кластера Zookeeper приведено в документе «Детальная архитектура».

Конфигурирование кластера Apache Zookeeper осуществляется в файле zookeeper.properties.

Минимальная конфигурация кластера#

Пример минимального файла конфигураций zookeeper.properties для кластера из трех серверов:

server.1=zookeeper-1-ip:2888:3888
server.2=zookeeper-2-ip:2888:3888
server.3=zookeeper-3-ip:2888:3888
dataDir=/path/to/zookeeper-data
clientPort=2181
tickTime=2000
initLimit=5
syncLimit=2

Шаги создания конфигурационного файла:

  1. Добавьте в файл переменные server.INDEX=IP:PORT1:PORT2, где:

    • INDEX — индекс (myid) узла Zookeeper в кластере;

    • IP — адрес данного сервера;

    • PORT1 — порт для подключения сервисов-фолловеров в кластере к лидеру;

    • PORT2 — порт для подключения остальных сервисов Zookeeper в момент выбора сервиса-лидера в кластере.

    Если необходимо запустить кластер Zookeeper на нескольких серверах, то в конфигурационном файле укажите адреса всех серверов.

    Пример

    server.1=10.168.1.1:2889:3889
    server.2=10.168.1.2:2889:3889
    
  2. Для переменной dataDir укажите путь к директории, в которую Zookeeper будет сохранять файлы с данными, необходимыми ему для работы.

    Пример

    dataDir=/var/log/kafka_2.11-0.9.0.0/zookeeper-data

  3. Для переменной clientPort укажите номер порта, к которому будут подключаться клиенты.

    Пример

    clientPort=2181

  4. Для переменной tickTime укажите единицу измерения времени Zookeeper — это время в миллисекундах, которое уходит на один «тик» или «такт» Zookeeper. tickTime неявно используется в настройках для различных временных параметров Zookeeper. Например, для временной настройки initLimit=10 — это означает, что initLimit будет выполняться за 10 * tickTime миллисекунд.

    По умолчанию tickTime=2000 миллисекунд.

    Пример

    tickTime=2000

  5. Добавьте в файл переменную initLimit — максимальное допустимое количество «тиков» (tickTime), которое может пройти, прежде чем узел кластера синхронизируется при запуске. При превышении будет сгенерирована ошибка.

    Пример

    initLimit=10

    Т. е. максимальное допустимое время на синхронизацию узла кластера при запуске будет равно (tickTime * initLimit) миллисекунд.

    Примечание

    Это обязательное свойство, без которого узел Zookeeper не будет запущен.

  6. Добавьте в файл переменную syncLimit — максимальное допустимое количество «тиков» (tickTime) между отправкой запроса и получением ответа, прежде чем будет сгенерировано исключение.

    Пример

    syncLimit=5

    Примечание

    Это обязательное свойство, без которого узел Zookeeper не будет запущен.

На этом минимальное конфигурирование кластера Zookeeper завершается.

Расширенная конфигурация кластера#

Существует ряд дополнительных настроек, которые можно включить в конфигурационный файл:

  • electionAlg — тип алгоритма для выбора лидера. Возможные значения:

    • 0 — версия алгоритма для выбора лидера — это оригинальная версия на основе UDP (User Datagram Protocol);

    • 1 — версия алгоритма для быстрого выбора лидера на основе UDP, неавторизованный;

    • 2 — версия алгоритма для быстрого выбора лидера на основе версии 1, но авторизованный;

    • 3 — версия алгоритма для быстрого выбора лидера на основе TCP.

    Версия 3 выбирается по умолчанию, версии 0, 1 и 2 носят статус устаревших и будут удалены в следующем релизе.

    Пример

    electionAlg=3

  • leaderServes — опция разрешает клиентам подключаться к серверу-лидеру. Значение по умолчанию — yes. Одна из основных задач сервера-лидера — следить за обновлениями и координировать. Отключением этой опции (установкой значения no) можно добиться некоторого прироста производительности при координации обновлений.

    Пример

    leaderServes=yes

  • Свойства, используемые совместно:

    • group.x=nnnn[:nnnn] — позволяет построить кворум с иерархической структурой, где x — это идентификатор кворума, а в качестве значения указывается список идентификаторов серверов, разделенных двоеточием.

      Примечание

      Группы не должны пересекаться, т. е. объединение всех групп должно давать полный список всех серверов.

    • weight.x=nnnnn — при использовании вместе с group.x=nnnn[:nnnn] позволяет назначить «вес» каждому серверу, который будет учитываться при выборе лидера. По умолчанию вес равен 1.

    Пример

    group.1=1:2:3
    group.2=4:5:6
    group.3=7:8:9
    weight.1=1
    weight.2=1
    weight.3=1
    weight.4=1
    weight.5=1
    weight.6=1
    weight.7=1
    weight.8=1
    weight.9=1
    

Следует учитывать, что запуск всех узлов кластера должен происходить за промежуток времени, не превышающий 10 тиков (heartbeat), заданных в строке initLimit=10. Поэтому лучше создать скрипт последовательного запуска всех узлов кластера с одной машины.

Запуск каждого узла Zookeeper выполняется командой:

bin/zookeeper-server-start.sh -daemon
config/zookeeper.properties

Или с параллельным выводом файла логов в консоль:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail --f logs/zookeeper.out

Запуск всех узлов Zookeeper с одного сервера выполняется специальным скриптом:

bin/zookeeper-cluster-start.sh
config/zookeeper-cluster.properties

Где config/zookeeper-cluster.properties — файл конфигураций кластера с IP-адресами всех серверов, с которых производится запуск, а также пользователей, путей к дистрибутиву Platform V Corax и имен файлов конфигураций Zookeeper-брокеров. Формат файла приведен в таблице ниже.

IP-адрес сервера

Имя пользователя

Путь к дистрибутиву Platform V Corax

Файл конфигураций Zookeeper

10.116.93.87

mon99usr

kafka\2.11-0.9.0.0/

zookeeper.properties

10.116.93.88

mon99usr

kafka\2.11-0.9.0.0/

zookeeper.properties

10.116.97.221

mon99usr

kafka\2.11-0.9.0.0/

zookeeper.properties

Проверка работоспособности#

При запуске всех узлов кластера Zookeeper в файлах logs/zookeeper.out должны быть следующие записи:

  • В файлах logs/zookeeper.out серверов-фолловеров:

    INFO FOLLOWING - LEADER ELECTION TOOK - 847
    (org.apache.zookeeper.server.quorum.Learner)
    
  • В файле logs/zookeeper.out сервера-лидера:

    INFO LEADING - LEADER ELECTION TOOK - 408
    (org.apache.zookeeper.server.quorum.Leader)
    

Также в консоли сервера-лидера должна быть запись, информирующая о создании кворума в кластере с перечислением ID узлов:

INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting
last processed zxid: 0x100000000
(org.apache.zookeeper.server.quorum.Leader)

Тестовый запуск кластера Zookeeper (2 узла)#

Тестовый запуск кластера Zookeeper из двух узлов производится на двух удаленных серверах:

Server-1: 10.116.93.87
Server-2: 10.116.93.88

Дополнительная информация:

  • версия дистрибутива Platform V Corax: kafka\2.11-0.9.0.0;

  • версия Zookeeper: zookeeper-3.4.6;

  • имя пользователя: mon99usr;

  • \$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где \$KAFKA_HOME — корневая директория дистрибутива Platform V Corax.

Для тестового запуска кластера Zookeeper выполните следующие действия:

  1. На каждом из серверов создайте директорию, где будут храниться данные Zookeeper:

    mkdir --p /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/
    
  2. На каждом из серверов создайте файл myid в директории /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/, созданной на предыдущем шаге (1). Запишите в файл соответствующий уникальный идентификатор узла Zookeeper:

    • на сервере Server-1 в файл myid запишите 1:

      echo 1 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      1
      
    • на сервере Server-2 в файл myid запишите 2:

      echo 2 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      2
      
  3. Внесите следующие настройки в файл конфигурации \$KAFKA_HOME/config/zookeeper.config для обоих серверов:

    server.1=10.116.93.87:2888:3888
    server.2=10.116.93.88:2888:3888
    clientPort=3181
    dataDir=/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/
    maxClientCnxns=60
    tickTime=2000
    initLimit=10
    syncLimit=5
    

    Где:

    • dataDir — директория для хранения данных Zookeeper, которая была создана на шаге (1).

    • server.1 — адрес Server-1; число 1 в server.1 — это myid сервера Server-1;

    • server.2 — адрес Server-2; число 2 в server.2 — это myid сервера Server-2.

  4. Запустите кластер Zookeeper из двух узлов:

    • Выполните запуск узла Zookeeper на Server-1:

      bin/zookeeper-server-start.sh --daemon config/zookeeper.config
      && tail --f logs/zookeeper.out
      
      [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0] bin/zookeeper-server-start.sh config/zookeeper.properties
      
      [2016-03-22 15:25:19,172] INFO Reading configuration from: config/zookeeper.properties
      
      [2016-03-22 15:25:19,174] WARN No server failure will be tolerated. You need at least 3 servers
      
      [2016-03-22 15:25:19,174] INFO Defaulting to majority quorums
      
      [2016-03-22 15:25:19,179] INFO autopurge.snapRetainCount set to 3
      
      [2016-03-22 15:25:19,179] INFO autopurge.purgeInterval set to 0
      
      [2016-03-22 15:25:19,179] INFO Purge task is not scheduled.
      
      [2016-03-22 15:25:19,198] INFO Starting quorum peer
      
      [2016-03-22 15:25:19,210] INFO binding to port 0.0.0.0/0.0.0.0:3181
      
      [2016-03-22 15:25:19,234] INFO tickTime set to 2000
      
      [2016-03-22 15:25:19,234] INFO minSessionTimeout set to -1
      
      [2016-03-22 15:25:19,234] INFO maxSessionTimeout set to -1
      
      [2016-03-22 15:25:19,234] INFO initLimit set to 10
      
      [2016-03-22 15:25:19,247] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001
      
      [2016-03-22 15:25:19,260] INFO My election bind port: 10.116.93.87:3888
      
      [2016-03-22 15:25:19,270] INFO New election. My id = 1, proposed zxid=0x700000001
      
      [2016-03-22 15:25:19,272] INFO Notification: 1 (message format version), 1 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection)
      
      [2016-03-22 15:25:19,284] WARN Cannot open channel to 2 at election address /10.116.93.88:3888
      
      java.net.ConnectException: Connection refused
      
      at java.net.PlainSocketImpl.socketConnect(Native Method)
      
      ...
      
      [2016-03-22 15:25:19,892] INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection)
      
      [2016-03-22 15:25:20,172] INFO Received connection request /10.116.93.88:45491 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
      
      [2016-03-22 15:25:20,379] INFO FOLLOWING
      
      [2016-03-22 15:25:20,384] INFO TCP NoDelay set to: true
      
      [2016-03-22 15:25:20,390] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
      
      [2016-03-22 15:25:20,390] INFO Server environment:host.name=SBT-IPO-203.ca.sbrf.ru
      
      [2016-03-22 15:25:20,390] INFO Server environment:java.version=1.8.0_60
      
      [2016-03-22 15:25:20,390] INFO Server environment:java.home=/usr/java/jre1.8.0_60
      
      [2016-03-22 15:25:20,390] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/..
      
      [2016-03-22 15:25:20,391] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
      
      [2016-03-22 15:25:20,391] INFO Server environment:java.io.tmpdir=/tmp
      
      [2016-03-22 15:25:20,391] INFO Server environment:java.compiler=\<NA\>
      
      [2016-03-22 15:25:20,391] INFO Server environment:os.name=Linux
      
      [2016-03-22 15:25:20,391] INFO Server environment:os.arch=amd64
      
      [2016-03-22 15:25:20,391] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
      
      [2016-03-22 15:25:20,391] INFO Server environment:user.name=mon99usr
      
      [2016-03-22 15:25:20,391] INFO Server environment:user.home=/home/mon99usr
      
      [2016-03-22 15:25:20,391] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
      
      [2016-03-22 15:25:20,393] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
      
      [2016-03-22 15:25:20,394] INFO FOLLOWING - LEADER ELECTION TOOK - 1124 (org.apache.zookeeper.server.quorum.Learner)
      
      [2016-03-22 15:25:20,421] INFO Getting a diff from the leader 0x700000001 (org.apache.zookeeper.server.quorum.Learner)
      
      [2016-03-22 15:25:20,425] INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001
      

      Запуск выполнен успешно.

    • Выполните запуск узла Zookeeper на Server-2:

      bin/zookeeper-server-start.sh --daemon config/zookeeper.config
      && tail --f logs/zookeeper.out
      
      [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$
      bin/zookeeper-server-start.sh config/zookeeper.properties
      
      [2016-03-22 15:25:19,874] INFO Reading configuration from:
      config/zookeeper.properties
      
      [2016-03-22 15:25:19,878] WARN No server failure will be tolerated. You need at least 3 servers.
      
      [2016-03-22 15:25:19,878] INFO Defaulting to majority quorums
      
      [2016-03-22 15:25:19,889] INFO autopurge.snapRetainCount set to 3
      
      [2016-03-22 15:25:19,889] INFO autopurge.purgeInterval set to 0
      
      [2016-03-22 15:25:19,889] INFO Purge task is not scheduled.
      
      [2016-03-22 15:25:19,941] INFO Starting quorum peer
      
      [2016-03-22 15:25:19,971] INFO binding to port 0.0.0.0/0.0.0.0:3181
      
      [2016-03-22 15:25:20,037] INFO tickTime set to 2000
      
      [2016-03-22 15:25:20,037] INFO minSessionTimeout set to -1
      
      [2016-03-22 15:25:20,037] INFO maxSessionTimeout set to -1
      
      [2016-03-22 15:25:20,038] INFO initLimit set to 10
      
      [2016-03-22 15:25:20,128] INFO My election bind port: /10.116.93.88:3888
      
      [2016-03-22 15:25:20,152] INFO LOOKING
      
      [2016-03-22 15:25:20,155] INFO New election. My id = 2, proposed zxid=0x700000001
      
      [2016-03-22 15:25:20,176] INFO Notification: 1 (message format version), 2 (n.leader), 0x700000001 (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x8 (n.peerEpoch) LOOKING (my state) (org.apache.zookeeper.server.quorum.FastLeaderElection)
      
      ...
      
      [2016-03-22 15:25:20,381] INFO LEADING
      
      [2016-03-22 15:25:20,386] INFO TCP NoDelay set to: true
      
      [2016-03-22 15:25:20,395] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
      
      [2016-03-22 15:25:20,395] INFO Server environment:host.name=SBT-IPO-204.ca.sbrf.ru
      
      [2016-03-22 15:25:20,395] INFO Server environment:java.version=1.8.0_66
      
      [2016-03-22 15:25:20,395] INFO Server environment:java.home=/usr/java/jre1.8.0_66
      
      [2016-03-22 15:25:20,395] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../libs/scala- (org.apache.zookeeper.server.ZooKeeperServer)
      
      [2016-03-22 15:25:20,396] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
      
      [2016-03-22 15:25:20,396] INFO Server environment:java.io.tmpdir=/tmp
      
      [2016-03-22 15:25:20,396] INFO Server environment:java.compiler=\<NA\>
      
      [2016-03-22 15:25:20,396] INFO Server environment:os.name=Linux
      
      [2016-03-22 15:25:20,396] INFO Server environment:os.arch=amd64
      
      [2016-03-22 15:25:20,396] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
      
      [2016-03-22 15:25:20,396] INFO Server environment:user.name=mon99usr
      
      [2016-03-22 15:25:20,396] INFO Server environment:user.home=/home/mon99usr
      
      [2016-03-22 15:25:20,396] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
      
      [2016-03-22 15:25:20,397] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 (org.apache.zookeeper.server.ZooKeeperServer)
      
      [2016-03-22 15:25:20,398] INFO LEADING - LEADER ELECTION TOOK - 243 (org.apache.zookeeper.server.quorum.Leader)
      
      [2016-03-22 15:25:20,410] INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer\@64c4b35a
      
      [2016-03-22 15:25:20,419] INFO Synchronizing with Follower sid: 1 maxCommittedLog=0x700000001 minCommittedLog=0x300000001 peerLastZxid=0x700000001
      
      [2016-03-22 15:25:20,419] INFO Sending DIFF
      
      [2016-03-22 15:25:20,431] INFO Received NEWLEADER-ACK message from 1
      
      [2016-03-22 15:25:20,433] INFO Have quorum of supporters, sids: [ 1,2 ]; starting up and setting last processed zxid: 0x900000000 (org.apache.zookeeper.server.quorum.Leader)
      

      Запуск выполнен успешно.

Пояснения к логам Zookeeper на сервере Server-1:

  • WARN No server failure will be tolerated. You need at least 3 servers

    В случае отказа одного из серверов Zookeeper сервис Zookeeper не сможет продолжить свою работу, т. к. нарушено правило кворума о строгом большинстве доступных узлов. Т. е. число работоспособных узлов в Zookeeper-кластере всегда должно быть больше числа неработоспособных узлов. В данном случае узла два и отказ одного из узлов приведет к неработоспособности сервиса.

  • WARN Cannot open channel to 2 at election address /10.116.93.88:3888 java.net.ConnectException: Connection refused

    Отказ при попытке соединения с Server-2, т. к. Server-2 в данный момент еще не был запущен.

  • INFO Notification time out: 800 (org.apache.zookeeper.server.quorum.FastLeaderElection)

    Тайм-аут при попытке выбрать лидера.

  • INFO Received connection request /10.116.93.88:45491

    Получен ответ на запрос о соединении с сервером Server-2.

  • INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2

    Описание конфигурации созданного сервера.

  • INFO Snapshotting: 0x700000001 to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.700000001

    Сохранение образа текущего состояния Zookeeper.

  • INFO FOLLOWING

    После успешного соединения Server-1 с Server-2:

    • Server-1 становится сервером-последователем;

    • Server-2 становится сервером-лидером.

  • FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn\'t match stored brokerId 1 in meta.properties

    Данная ошибка запуска Zookeeper на сервере Server-2 происходит по причине конфликта в служебный файл Zookeeper meta.properties записался неверный идентификатор брокера broker.id=1, в то время как идентификатор myid сервера Server-2 указан, как равный двум. Решением проблемы будет удаление файла meta.properties, тогда при новом запуске Zookeeper этот файл будет сгенерирован автоматически и с корректными настройками.

Пояснения к логам Zookeeper на сервере Server-2:

  • INFO LOOKING

    INFO New election. My id = 2, proposed zxid=0x700000001...

    INFO LEADING -- LEADING ELECTION TOOK -- 250

    Поиск сервера-лидера (т. к. сервер-лидер еще не выбран), голосование на выбор сервера-лидера (каждый из серверов предлагает себя), сервером-лидером выбран Server-2, а Server-1 автоматически становится сервером-последователем.

  • WARN Cannot open channel to 2 at election address /10.116.93.88:3888

    (org.apache.zookeeper.server.quorum.QuorumCnxManager)

    java.net.ConnectException: Connection refused

    Попытка подключиться к отключенному серверу для выбора лидера. Сообщение возникает, если отключить Server-1 или Server-2 во время работы обоих серверов.

Тестовый запуск кластера Zookeeper (3 узла)#

Тестовый запуск кластера Zookeeper из трех узлов и кластера Platform V Corax из трех узлов будет производиться на трех удаленных серверах:

Server-1: 10.116.93.87 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]
Server-2: 10.116.93.88 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]
Server-3: 10.116.97.221 [ 1 Брокер Zookeeper, 1 Брокер Kafka ]

Дополнительная информация:

  • версия дистрибутива Platform V Corax: kafka\2.11-0.9.0.0;

  • версия Zookeeper: zookeeper-3.4.6;

  • имя пользователя: mon99usr;

  • \$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где \$KAFKA_HOME — домашняя корневая директория дистрибутива Platform V Corax.

Конфигурация кластера Zookeeper из трех узлов

Каждый узел располагается на отдельном сервере.

Для конфигурации кластера Zookeeper выполните следующие действия:

  1. На каждом из серверов создайте директорию, где будут храниться данные Zookeeper:

    mkdir --p /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/
    
  2. На каждом из серверов создайте файл myid в директории /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/, созданной на предыдущем шаге (1). Это уникальный идентификатор каждого узла Zookeeper, запишите его в файл:

    • на сервере Server-1 в файл myid запишите 1:

      echo 1 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      1
      
    • на сервере Server-2 в файл myid запишите 2:

      echo 2 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      2
      
    • на сервере Server-3 в файл myid запишите 3:

      echo 3 > /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      cat /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/myid
      3
      
  3. Внестие следующие настройки в файл конфигурации. \$KAFKA_HOME/config/zookeeper.config для трех серверов одинаков:

    server.1=10.116.93.87:2888:3888
    server.2=10.116.93.88:2888:3888
    server.3=10.116.97.221:2888:3888
    clientPort=3181
    dataDir=/var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/
    tickTime=2000
    initLimit=10
    syncLimit=5
    maxClientCnxns=60
    

    Где:

    • dataDir — директория для хранения данных Zookeeper, которая была создана на шаге (1);

    • server.1 — адрес Server-1; число 1 в server.1 — это myid сервера Server-1;

    • server.2 — адрес Server-2; число 2 в server.2 — это myid сервера Server-2;

    • server.3 — адрес Server-3; число 3 в Server.3 — это myid сервера Server-3.

Запуск кластера Zookeeper из трех узлов

  1. Выполните запуск узла Zookeeper на сервере Server-1 в режиме демона с выводом логов:

    bin/zookeeper-server-start.sh -daemon
    config/zookeeper.properties && tail -f logs/zookeeper.out
    [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &&
    tail -f logs/zookeeper.out
    
    [2016-03-28 12:56:43,375] INFO Reading configuration from: config/zookeeper.properties
    
    [2016-03-28 12:56:43,377] INFO Defaulting to majority quorums
    
    [2016-03-28 12:56:43,382] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:56:43,382] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:56:43,382] INFO Purge task is not scheduled.
    
    [2016-03-28 12:56:43,411] INFO Starting quorum peer
    
    [2016-03-28 12:56:43,424] INFO binding to port 0.0.0.0/0.0.0.0:3181
    
    [2016-03-28 12:56:43,447] INFO tickTime set to 2000
    
    [2016-03-28 12:56:43,448] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:56:43,448] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:56:43,448] INFO initLimit set to 10
    
    [2016-03-28 12:56:43,467] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac
    
    [2016-03-28 12:56:43,507] INFO My election bind port: /10.116.93.87:3888
    
    [2016-03-28 12:56:43,518] INFO LOOKING
    
    [2016-03-28 12:56:43,520] INFO New election. My id = 1, proposed zxid=0x310000003d
    
    [2016-03-28 12:56:43,522] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:43,530] WARN Cannot open channel to 2 at election address /10.116.93.88:3888
    
    java.net.ConnectException: Connection refused
    
    [2016-03-28 12:56:43,534] WARN Cannot open channel to 3 at election address /10.116.97.221:3888
    
    java.net.ConnectException: Connection refused
    
    [2016-03-28 12:56:56,388] INFO Received connection request /10.116.93.88:11287 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
    
    [2016-03-28 12:56:56,595] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:56:56,600] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:56:56,606] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:56:56,607] INFO Server environment:host.name=SBT-IPO-203.ca.sbrf.ru
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.version=1.8.0_60
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.home=/usr/java/jre1.8.0_60
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../....
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.library.path=/usr/java/packages/...
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.home=/home/mon99usr
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
    
    [2016-03-28 12:56:56,609] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 Datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:56:56,609] INFO FOLLOWING - LEADER ELECTION TOOK - 13090 (org.apache.zookeeper.server.quorum.Learner)
    
    [2016-03-28 12:56:56,616] WARN Unexpected exception, tries=0, connecting to /10.116.93.88:2888
    
    java.net.ConnectException: Connection refused
    
    ...
    
    [2016-03-28 12:56:57,640] INFO Getting a diff from the leader 0x310000003d
    
    [2016-03-28 12:56:57,645] INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d
    
    [2016-03-28 12:57:04,004] WARN Got zxid 0x3200000001 expected 0x1
    
    [2016-03-28 12:57:04,004] INFO Creating new log file: log.3200000001
    
    [2016-03-28 12:57:06,472] INFO Received connection request /10.116.97.221:47171
    
    [2016-03-28 12:57:06,475] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) FOLLOWING (my state)
    

    Где tail -f logs/zookeeper.out — вывод текущего лога Zookeeper-брокера из файла в реальном времени.

    Запуск Zookeeper-брокера на Server-1 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере 10.116.93.88 и с Брокером на сервере 10.116.97.221, т. к. Брокеры на этих серверах еще не запущены:

      INFO LOOKING
      
      WARN Cannot open channel to 2 at election address /10.116.93.88:3888
      
      java.net.ConnectException: Connection refused
      
      WARN Cannot open channel to 3 at election address /10.116.97.221:3888
      
      java.net.ConnectException: Connection refused
      
    • Принят запрос на соединение от Брокера на сервере Server-2 по адресу 10.116.93.88 в связи со запуском этого Брокера:

      INFO Received connection request /10.116.93.88:11287
      

      Как правило, после этого сообщения новый Брокер включается в кластер.

    • Выбран сервер-лидер — это только что запущенный Брокер на сервере Server-2. Брокер на текущем сервере Server-1 выбран в качестве сервера-последователя (FOLLOWER):

      INFO FOLLOWING
      
    • Синхронизация Брокера-последователя на сервере Server-1 с Брокером-лидером на сервере Server-2: получение от сервера-лидера Server-2 текущего состояния системы (diff). В результате на сервере Server-1 будет создан и сохранен соответствующий образ (snapshot) только что полученных данных:

      INFO Getting a diff from the leader 0x310000003d
      
      INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d
      
    • Принят запрос на соединение от Брокера на сервере Server-3 по адресу 10.116.97.221 в связи со запуском этого Брокера:

      INFO Received connection request /10.116.97.221:11287
      

      С этого момента новый Брокер включается в кластер.

    • После сообщения INFO Notification: 1... вывод логов прекращается в связи с отсутствием дальнейших событий.

  2. Выполните запуск узла Zookeeper на сервере Server-2 в режиме демона с выводом логов:

    bin/zookeeper-server-start.sh -daemon
    config/zookeeper.properties && tail -f logs/zookeeper.out
    [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &&
    tail -f logs/zookeeper.out
    
    [2016-03-28 12:56:56,055] INFO Reading configuration from:
    config/zookeeper.properties
    
    [2016-03-28 12:56:56,058] INFO Defaulting to majority quorums
    
    [2016-03-28 12:56:56,067] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:56:56,067] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:56:56,067] INFO Purge task is not scheduled.
    
    [2016-03-28 12:56:56,116] INFO Starting quorum peer
    
    [2016-03-28 12:56:56,135] INFO binding to port 0.0.0.0/0.0.0.0:3181
    
    [2016-03-28 12:56:56,184] INFO tickTime set to 2000
    
    [2016-03-28 12:56:56,185] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:56:56,185] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:56:56,185] INFO initLimit set to 10
    
    [2016-03-28 12:56:56,211] INFO Reading snapshot
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.2a000000a9
    
    [2016-03-28 12:56:56,375] INFO My election bind port:
    /10.116.93.88:3888
    
    [2016-03-28 12:56:56,378] INFO LOOKING
    (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:56:56,380] INFO New election. My id = 2, proposed zxid=0x310000003d
    
    [2016-03-28 12:56:56,394] INFO Notification: 1 (message format
    version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING
    (n.state), 2 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,396] WARN Cannot open channel to 3 at election address /10.116.97.221:3888
    
    java.net.ConnectException: Connection refused
    
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    
    ...
    
    [2016-03-28 12:56:56,398] INFO Notification: 1 (message format
    version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,399] INFO Notification: 1 (message format
    version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,601] INFO LEADING
    
    [2016-03-28 12:56:56,605] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:56:56,615] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:56:56,615] INFO Server environment:host.name=SBT-IPO-204.ca.sbrf.ru
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.version=1.8.0_66
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.home=/usr/java/jre1.8.0_66
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/....
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.library.path=/usr/java/...
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.home=/home/mon99usr
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
    
    [2016-03-28 12:56:56,618] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000
    
    datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:56:56,619] INFO LEADING - LEADER ELECTION TOOK - 239
    
    [2016-03-28 12:56:57,627] INFO Follower sid: 1 : info :
    org.apache.zookeeper.server.quorum.QuorumPeer\$...
    
    [2016-03-28 12:56:57,638] INFO Synchronizing with Follower sid: 1
    maxCommittedLog=0x310000003d minCommittedLog=0x2e0000006f
    peerLastZxid=0x310000003d
    
    [2016-03-28 12:56:57,638] INFO Sending DIFF
    
    [2016-03-28 12:56:57,666] INFO Received NEWLEADER-ACK message from 1
    
    [2016-03-28 12:56:57,669] INFO Have quorum of supporters, sids: [ 1,2]; starting up and setting last processed zxid: ..
    
    [2016-03-28 12:57:04,000] INFO Expiring session 0x253bc816e870000, timeout of 6000ms exceeded
    
    [2016-03-28 12:57:04,001] INFO Processed session termination for sessionid: 0x253bc816e870000
    
    [2016-03-28 12:57:04,002] INFO Creating new log file: log.3200000001
    
    [2016-03-28 12:57:06,475] INFO Received connection request
    /10.116.97.221:34164
    
    [2016-03-28 12:57:06,477] INFO Notification: 1 (message format
    version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) LEADING (my state)
    
    [2016-03-28 12:57:06,513] INFO Follower sid: 3 : info :
    org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer@...
    
    [2016-03-28 12:57:06,517] INFO Synchronizing with Follower sid: 3
    maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070
    peerLastZxid=0x310000003d
    
    [2016-03-28 12:57:06,518] INFO Sending DIFF
    
    [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
    

    Запуск брокера на сервере Server-2 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере 10.116.97.221, т. к. Брокер на этом сервере еще не запущен:

      INFO LOOKING
      
      WARN Cannot open channel to 3 at election address /10.116.97.221:3888
      
      java.net.ConnectException: Connection refused
      
    • Лидером выбран Брокер на сервере Server-2 (на выбор лидера ушло 239 миллисекунд). От Брокера на сервере Server-1 получено соответствующее подтверждение. Выведено сообщение о том, что создан новый кворум [1,2] из двух сидов-Брокеров с идентификаторами myid = 1 и 2 соответственно.

      INFO LEADING
      
      INFO LEADING - LEADER ELECTION TOOK -- 239
      
      INFO Have quorum of supporters, sids: [ 1,2 ];
      
    • Синхронизация Брокера-последователя на сервере Server-1 с Брокером-лидером на сервере Server-2. Брокер-лидер отправляет текущее состояние системы (DIFF) и тем самым запускает синхронизацию. После обработки DIFF Брокером-последователем (сохранения DIFF в виде образа (snapshot) текущего состояния системы) от Брокера-последователя поступает соответствующее подтверждение о получении Received NEWLEADER-ACK:

      INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.Q...
      
      INFO Synchronizing with Follower sid: 1
      
      INFO Sending DIFF
      
      INFO Received NEWLEADER-ACK message from 1
      
    • Принят запрос на соединение от нового Брокера на сервере Server-3 по адресу 10.116.97.221 в связи со запуском этого Брокера:

      INFO Received connection request /10.116.97.221:34164
      
    • Новый Брокер с идентификатором myid=3 принят в кворум в качестве Брокера-последователя:

      INFO Follower sid: 3
      
    • Синхронизация Брокера-последователя на сервере Server-3 с Брокером-лидером на сервере Server-2. Брокеру-последователю отправлено текущее состояние системы (DIFF), успешно обработано Брокером-последователем, в ответ выслано соответствующее подтверждение:

      INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d
      
      [2016-03-28 12:57:06,518] INFO Sending DIFF
      
      [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
      
  3. Выполните запуск узла Zookeeper на сервере Server-3 в режиме демона с выводом логов:

    > bin/zookeeper-server-start.sh -daemon
    
    config/zookeeper.properties && tail -f logs/zookeeper.out
    
    [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out
    
    [2016-03-28 12:57:07,273] INFO Reading configuration from:
    config/zookeeper.properties
    
    [2016-03-28 12:57:07,276] INFO Defaulting to majority quorums
    
    [2016-03-28 12:57:07,282] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:57:07,282] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:57:07,282] INFO Purge task is not scheduled.
    
    [2016-03-28 12:57:07,303] INFO Starting quorum peer
    
    [2016-03-28 12:57:07,321] INFO binding to port 0.0.0.0/0.0.0.0:3181
    
    [2016-03-28 12:57:07,357] INFO tickTime set to 2000
    
    [2016-03-28 12:57:07,357] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:57:07,357] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:57:07,357] INFO initLimit set to 10
    
    [2016-03-28 12:57:07,378] INFO Reading snapshot
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac
    
    [2016-03-28 12:57:07,430] INFO My election bind port:
    /10.116.97.221:3888
    
    [2016-03-28 12:57:07,444] INFO LOOKING
    
    [2016-03-28 12:57:07,445] INFO New election. My id = 3, proposed zxid=0x31000003d
    
    [2016-03-28 12:57:07,459] INFO Notification: 1 (message format
    version), 3 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,460] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,460] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), FOLLOWING (n.state), 1 (n.sid), 0x2 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,461] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,462] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LEADING (n.state), 2 (n.sid), 0x32(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,462] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:57:07,469] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:57:07,478] INFO Server
    environment:zookeeper.version=3.4.6-156995, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:57:07,478] INFO Server environment:host.name=SBT-IPO-208.ca.sbrf.ru
    
    [2016-03-28 12:57:07,487] INFO Server environment:java.version=1.8.0_71
    
    [2016-03-28 12:57:07,487] INFO Server
    environment:java.home=/usr/java/jre1.8.0_1
    
    [2016-03-28 12:57:07,487] INFO Server
    environment:java.class.path=:/home/mon99ur/...
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:java.library.path=/usr/java/packages/...
    
    [2016-03-28 12:57:07,488] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:57:07,488] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:57:07,488] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:57:07,488] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:os.version=3.10.0-123.9.3.el7x86_64
    
    [2016-03-28 12:57:07,488] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:user.home=/home/mon99usr
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:user.dir=/home/mon99usr/kafka2.11-0.9.0.0
    
    [2016-03-28 12:57:07,490] INFO Created server with tickTime 2000
    minSessionTimeout 4000 maxSessionTimeout 40000 datadir
    
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:57:07,491] INFO FOLLOWING - LEADER ELECTION TOOK - 46
    
    [2016-03-28 12:57:07,502] INFO Getting a diff from the leader
    0x3200000001
    
    [2016-03-28 12:57:07,506] WARN Got zxid 0x3200000001 expected 0x1
    
    [2016-03-28 12:57:07,508] INFO Snapshotting: 0x3200000001 to
    /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
    

    Запуск брокера на сервере Server-3 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, инициация алгоритма выбора лидера, предложение своей кандидатуры в качестве лидера аналогичны инициализации двух предыдущих Брокеров:

      INFO LOOKING
      
      INFO New election. My id = 3, proposed zxid=0x31000003d
      
    • Брокер на сервере Server-3 выбран в качестве Брокера-последователя:

      INFO FOLLOWING
      
      INFO FOLLOWING - LEADER ELECTION TOOK -- 46
      
    • Синхронизация с Брокером на сервере Server-2. Получение состояния системы и сохранение образа:

      INFO Getting a diff from the leader 0x3200000001
      
      INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
      

Конфигурация кластера Platform V Corax из трех узлов

Каждый узел расположен на отдельном сервере.

В целях тестирования доступности сервиса Zookeeper с различным количеством работающих узлов сконфигурируйте кластер Platform V Corax:

  1. На каждом из серверов создайте директорию, где будут храниться логи Platform V Corax:

    mkdir --p /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/
    
  2. Создайте файл конфигурации \$KAFKA_HOME/config/server.config для каждого Platform V Corax-брокера.

    • Для сервера Server-1:

      broker.id=1
      
      host.name=10.116.93.87
      
      log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs
      
      zookeeper.connect=10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181
      
    • Для сервера Server-2:

      broker.id=2
      
      host.name=10.116.93.88
      
      log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs
      
      zookeeper.connect=10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181
      
    • Для сервера Server-3:

      broker.id=3
      
      host.name=10.116.97.221
      
      log.dirs=/var/tmp/kafka_2.11-0.9.0.0/kafka-logs
      
      zookeeper.connect=10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181
      

    Где:

    • log.dirs — директория, где хранятся логи Platform V Corax, которая была создана на шаге (1);

    • host.name — IP-адрес сервера, на котором находится Platform V Corax-брокер;

    • broker.id — идентификатор Platform V Corax-брокера, обязательно должен быть уникальным в рамках кластера Platform V Corax;

    • zookeeper.connect — список адресов Zookeeper-брокеров в формате IP:PORT, где:

      • IP — адрес сервера, на котором выполняется Zookeeper-брокер;

      • PORT — порт, к которому подключаются клиенты Zookeeper — его clientPort.

    Примечание

    Свойство port не указано в файлах, т. к. нет необходимости прописывать его явно: port=9092 используется Platform V Corax по умолчанию.

    При необходимости значение порта можно прописать явно или изменить на другое.

Запуск кластера Platform V Corax из трех узлов

  1. Выполните запуск узла Platform V Corax на сервере Server-1 в режиме демона с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log
    
    [2016-03-28 17:16:50,514] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 11 milliseconds.
    
    [2016-03-28 17:16:50,531] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:16:50,532] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:16:50,537] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:16:50,549] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:16:50,555] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:16:50,556] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(10.116.93.87,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:16:50,571] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,309] INFO KafkaConfig values:
    
    advertised.host.name = null
    
    metric.reporters = []
    
    quota.producer.default = 9223372036854775807
    
    offsets.topic.num.partitions = 50
    
    log.flush.interval.messages = 9223372036854775807
    
    auto.create.topics.enable = true
    
    controller.socket.timeout.ms = 30000
    
    log.flush.interval.ms = null
    
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    
    replica.socket.receive.buffer.bytes = 65536
    
    min.insync.replicas = 1
    
    replica.fetch.wait.max.ms = 500
    
    num.recovery.threads.per.data.dir = 1
    
    ssl.keystore.type = JKS
    
    default.replication.factor = 1
    
    ssl.truststore.password = null
    
    log.preallocate = false
    
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    
    fetch.purgatory.purge.interval.requests = 1000
    
    ssl.endpoint.identification.algorithm = null
    
    replica.socket.timeout.ms = 30000
    
    message.max.bytes = 1000012
    
    num.io.threads = 8
    
    offsets.commit.required.acks = -1
    
    log.flush.offset.checkpoint.interval.ms = 60000
    
    delete.topic.enable = false
    
    quota.window.size.seconds = 1
    
    ssl.truststore.type = JKS
    
    offsets.commit.timeout.ms = 5000
    
    quota.window.num = 11
    
    zookeeper.connect = 10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181
    
    authorizer.class.name =
    
    num.replica.fetchers = 1
    
    log.retention.ms = null
    
    log.roll.jitter.hours = 0
    
    log.cleaner.enable = false
    
    offsets.load.buffer.size = 5242880
    
    log.cleaner.delete.retention.ms = 86400000
    
    ssl.client.auth = none
    
    controlled.shutdown.max.retries = 3
    
    queued.max.requests = 500
    
    offsets.topic.replication.factor = 3
    
    log.cleaner.threads = 1
    
    sasl.kerberos.service.name = null
    
    sasl.kerberos.ticket.renew.jitter = 0.05
    
    socket.request.max.bytes = 104857600
    
    ssl.trustmanager.algorithm = PKIX
    
    zookeeper.session.timeout.ms = 6000
    
    log.retention.bytes = -1
    
    sasl.kerberos.min.time.before.relogin = 60000
    
    zookeeper.set.acl = false
    
    connections.max.idle.ms = 600000
    
    offsets.retention.minutes = 1440
    
    replica.fetch.backoff.ms = 1000
    
    inter.broker.protocol.version = 0.9.0.X
    
    log.retention.hours = 168
    
    num.partitions = 1
    
    listeners = null
    
    ssl.provider = null
    
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    
    log.roll.ms = null
    
    log.flush.scheduler.interval.ms = 9223372036854775807
    
    ssl.cipher.suites = null
    
    log.index.size.max.bytes = 10485760
    
    ssl.keymanager.algorithm = SunX509
    
    security.inter.broker.protocol = PLAINTEXT
    
    replica.fetch.max.bytes = 1048576
    
    advertised.port = null
    
    log.cleaner.dedupe.buffer.size = 524288000
    
    replica.high.watermark.checkpoint.interval.ms = 5000
    
    log.cleaner.io.buffer.size = 524288
    
    sasl.kerberos.ticket.renew.window.factor = 0.8
    
    zookeeper.connection.timeout.ms = null
    
    controlled.shutdown.retry.backoff.ms = 5000
    
    log.roll.hours = 168
    
    log.cleanup.policy = delete
    
    host.name = 10.116.93.87
    
    log.roll.jitter.ms = null
    
    max.connections.per.ip = 2147483647
    
    offsets.topic.segment.bytes = 104857600
    
    background.threads = 10
    
    quota.consumer.default = 9223372036854775807
    
    request.timeout.ms = 30000
    
    log.index.interval.bytes = 4096
    
    log.dir = /tmp/kafka-logs
    
    log.segment.bytes = 1073741824
    
    log.cleaner.backoff.ms = 15000
    
    offset.metadata.max.bytes = 4096
    
    ssl.truststore.location = null
    
    group.max.session.timeout.ms = 30000
    
    ssl.keystore.password = null
    
    zookeeper.sync.time.ms = 2000
    
    port = 9092
    
    log.retention.minutes = null
    
    log.segment.delete.delay.ms = 60000
    
    log.dirs = /var/tmp/kafka_2.11-0.9.0.0/kafka-logs
    
    controlled.shutdown.enable = true
    
    compression.type = producer
    
    max.connections.per.ip.overrides =
    
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    
    auto.leader.rebalance.enable = true
    
    leader.imbalance.check.interval.seconds = 300
    
    log.cleaner.min.cleanable.ratio = 0.5
    
    replica.lag.time.max.ms = 10000
    
    num.network.threads = 3
    
    ssl.key.password = null
    
    reserved.broker.max.id = 1000
    
    metrics.num.samples = 2
    
    socket.send.buffer.bytes = 102400
    
    ssl.protocol = TLS
    
    socket.receive.buffer.bytes = 102400
    
    ssl.keystore.location = null
    
    replica.fetch.min.bytes = 1
    
    unclean.leader.election.enable = true
    
    group.min.session.timeout.ms = 6000
    
    log.cleaner.io.buffer.load.factor = 0.9
    
    offsets.retention.check.interval.ms = 600000
    
    producer.purgatory.purge.interval.requests = 1000
    
    metrics.sample.window.ms = 30000
    
    broker.id = 1
    
    offsets.topic.compression.codec = 0
    
    log.retention.check.interval.ms = 300000
    
    advertised.listeners = null
    
    leader.imbalance.per.broker.percentage = 10
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:17:06,377] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,382] INFO Connecting to zookeeper on 10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,748] INFO Loading logs. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,986] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-0/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:17:06,987] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log)
    
    [2016-03-28 17:17:06,988] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log)
    
    [2016-03-28 17:17:06,990] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-1/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:17:06,991] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log)
    
    [2016-03-28 17:17:06,992] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log)
    
    [2016-03-28 17:17:06,994] INFO Logs loading complete. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,995] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,996] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:17:07,040] INFO Awaiting socket connections on SBT-IPO-203.ca.sbrf.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:17:07,043] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:17:07,062] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:07,063] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:07,124] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:07,142] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:07,142] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    
    [2016-03-28 17:17:09,020] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:17:09,051] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:17:09,053] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:09,053] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:17:09,057] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:09,060] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:17:09,075] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:17:09,084] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:17:09,128] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:09,138] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:09,139] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -\> EndPoint(10.116.93.87,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:17:09,151] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:09,409] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [NUMBERS,0],[NUMBERS,1] (kafka.server.ReplicaFetcherManager)
    
    [2016-03-28 17:17:10,584] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions[NUMBERS,0] (kafka.server.ReplicaFetcherManager)
    

    Где tail -f logs/server.log — вывод текущего лога Platform V Corax-брокера из файла в реальном времени.

    Запуск Platform V Corax-брокера на сервере Server-1 выполнен успешно.

  2. Выполните запуск узла Zookeeper на сервере Server-2 в режиме демона с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log
    
    [2016-03-28 17:35:35,726] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,36]
    
    [2016-03-28 17:35:35,728] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,36] in 2 milliseconds.
    
    [2016-03-28 17:36:11,334] INFO KafkaConfig values:
    
    ....
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:36:11,441] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:11,450] INFO Connecting to zookeeper on 10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:11,870] INFO Loading logs. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,775] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-1/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:36:12,776] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log)
    
    [2016-03-28 17:36:12,777] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log)
    
    [2016-03-28 17:36:12,780] WARN Found an corrupted index file, /var/tmp/kafka_2.11-0.9.0.0/kafka-logs/NUMBERS-0/00000000000000000000.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:36:12,781] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log)
    
    [2016-03-28 17:36:12,782] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log)
    
    [2016-03-28 17:36:12,785] INFO Logs loading complete. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,786] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,799] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,858] INFO Awaiting socket connections on SBT-IPO-204.ca.sbrf.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:36:12,861] INFO [Socket Server on Broker 2], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:36:13,054] INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:36:13,064] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:36:13,064] INFO [GroupCoordinator 2]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:36:13,073] INFO [Group Metadata Manager on Broker 2]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:36:13,075] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:36:13,087] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:36:13,088] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:36:13,094] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:36:13,119] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:36:13,133] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:36:13,135] INFO Registered broker 2 at path /brokers/ids/2 with addresses: PLAINTEXT -\> EndPoint(10.116.93.88,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:36:13,149] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:15,082] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test2,1],[test,14],[test2,13],[test,15],[test,3],[test2,11],[test,6],[test5,1],[test2,5],[ThrashMetal,2],[test,17],[test5,0],[test,5],[test,11],[test3,1],[test2,15],[test2,18],[test2,8],[test,16],[test,1],[NUMBERS,0],[test,4],[test,19],[test2,3],[test2,10],[ThrashMetal,0],[test,18],[test,13],[test2,2],[test2,17],[test,12],[test2,7],[test,2],[test,8],[test2,16],[NUMBERS,1],[test2,6],[test,0],[test2,4],[test2,0],[test,7],[test2,9],[test,9],[superTest,1],[test3,0],[test2,14],[ThrashMetal,1],[superTest,0],[test2,19],[test,10],[test2,12] (kafka.server.ReplicaFetcherManager)
    
    [2016-03-28 17:36:15,096] INFO Truncating log NUMBERS-0 to offset 13. (kafka.log.Log)
    
    [2016-03-28 17:36:15,100] INFO Truncating log NUMBERS-1 to offset 14. (kafka.log.Log)
    
    [2016-03-28 17:36:15,162] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([[NUMBERS,1], ...) initOffset 14 to broker BrokerEndPoint(1,10.116.93.87,9092)]
    
    [2016-03-28 17:36:15,171] INFO [ReplicaFetcherThread-0-1], Starting (kafka.server.ReplicaFetcherThread)
    
    [2016-03-28 17:36:15,354] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,22] (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:36:15,424] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,22] in 69 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:37:14,092] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [NUMBERS,1] (kafka.server.ReplicaFetcherManager)
    

    Запуск Platform V Corax-брокера на сервере Server-2 выполнен успешно.

  3. Выполните запуск узла Platform V Corax на сервере Server-3 в режиме демона с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$ bin/kafka-server-start.sh -daemon config/server-3-servers.properties && tail -f logs/server.log
    
    Kafka application log tail -f from /home/mon99usr/kafka_2.11-0.9.0.0/logs/server.log
    
    [ [2016-03-28 17:45:56,259] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:45:56,280] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:45:56,281] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(10.116.97.221,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:45:56,300] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:50:45,138] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:50:57,090] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:50:57,096] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:50:57,097] INFO 3 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    
    [2016-03-28 17:51:00,733] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:52:37,509] INFO KafkaConfig values:
    
    ...
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:52:37,675] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:52:37,686] INFO Connecting to zookeeper on 10.116.93.87:3181,10.116.93.88:3181,10.116.97.221:3181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:52:38,083] INFO Loading logs.
    
    [2016-03-28 17:52:38,090] INFO Logs loading complete.
    
    [2016-03-28 17:52:38,091] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:52:38,103] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:52:38,163] INFO Awaiting socket connections on SBT-IPO-208.ca.sbrf.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:52:38,169] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:52:38,199] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,203] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,378] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:52:38,394] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 13 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:52:38,396] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,397] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:52:38,407] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,432] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:52:38,437] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:52:38,445] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:52:38,485] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:52:38,504] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:52:38,506] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -\> EndPoint(10.116.97.221,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:52:38,524] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
    

    Запуск Platform V Corax-брокера на сервере Server-3 выполнен успешно.

Запуск кластеров Platform V Corax и Zookeeper выполнен успешно.

Тест доступности Zookeeper / работоспособности Platform V Corax при различных условиях работы кластера Zookeeper

  1. Тест работоспособности Platform V Corax при всех работающих узлах кластера Zookeeper.

    • Вывод списка топиков:

      bin/kafka-topics.sh
      
      ---list
      
      --zookeeper 10.116.93.87:3181,10.116.93.87:3181,10.116.93.87:3181
      
      ---topic NUMBERS
      
      NUMBERS
      

      В списке содержится топик NUMBERS.

    • Отправка сообщений в топик NUMBERS:

      bin/kafka-console-producer.sh
      
      --broker-list
      10.116.93.87:9092,10.116.93.88:9092,10.116.97.221:9092
      
      --topic NUMBERS
      
      1
      
      2
      
      3
      

      Запись в топик успешна.

    • Чтение сообщений из топика:

      bin/kafka-console-consumer.sh
      
      --new-consumer
      
      --bootstrap-server 10.116.93.88:9092
      
      --topic NUMBERS
      
      --from-beginning
      
      1
      
      2
      
      3
      

      Чтение из топика успешно.

  2. Тест работоспособности Platform V Corax при отказе одного узла кластера Zookeeper:

    • список топиков выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.

  3. Тест работоспособности Platform V Corax при отказе двух узлов кластера Zookeeper:

    • список топиков не выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.

  4. Тест работоспособности Platform V Corax при полном отказе кластера Zookeeper:

    • список топиков не выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.

Конфигурирование Platform V Corax#

Конфигурирование узла (брокера) Platform V Corax#

Конфигурирование брокера Platform V Corax осуществляется в файле config/server.properties.

Допускается создание файлов конфигурации с различными именами, например:

    config/server-1-node-cluster.properties
    config/server-3-node-cluster.properties
    config/server-3-node-cluster-on-1-machine.properties
    config/server-5-node-cluster.properties

При запуске Platform V Corax достаточно указать нужный конфигурационный файл.

Минимальная конфигурация#

Пример файла конфигураций server.properties для одного брокера:

broker.id=1
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

Шаги создания конфигурационного файла:

  1. Присвойте ключу broker.id необходимый числовой идентификатор. Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.

    Пример

    broker.id=1

  2. Для ключа zookeeper.connect укажите IP:PORT сервиса Zookeeper.

    Пример

    Запущен один экземпляр Zookeeper и его порт clientPort=2181:

    zookeeper.connect=localhost:2181

    Пример

    Запущен кластер из нескольких экземпляров Zookeeper (адреса всех служб Zookeeper указаны через запятую):

    zookeeper.connect=10.168.1.1:2181,10.168.1.2:2181

  3. Для ключа log.dirs задайте одну или несколько директорий, в которых планируется хранение сообщений, передаваемых в Platform V Corax.

    Пример

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

    Примечание

    Если планируется использовать Platform V Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.

    Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Platform V Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs

  4. Для ключа listeners присвойте строку формата <ПРОТОКОЛ>://<хост>:<порт>:

    Пример

    listeners=PLAINTEXT://localhost:9092
    listeners=SSL://10.116.93.88:9092
    listeners=SASL_PLAINTEXT://:9092
    
Расширенная конфигурация#

Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Наиболее важные из них приведены далее.

  • Если на одном физическом сервере планируется запускать несколько брокеров Platform V Corax, то для каждого брокера необходимо также указать уникальный порт, заданный в ключе port. По умолчанию сервер Platform V Corax слушает порт 9092.

  • Можно изменить значение количества партиций для вновь создаваемых топиков по умолчанию в ключе num.partitions. Если для сервера Platform V Corax доступны несколько жестких дисков, увеличение числа партиций увеличит производительность Platform V Corax. Количество партиций по умолчанию можно указать равным числу дисков.

    Пример:

    num.partitions=4

  • Для повышения надежности необходимо раскомментировать записи, содержащие ключи log.flush.interval.messages и log.flush.interval.ms. В этом случае указывается, как часто должна происходить реальная запись сообщений на диск: либо по истечении заданного времени, либо после достижения заданного количества незаписанных сообщений. Данную настройку можно изменить для конкретного топика при его создании.

    Пример:

    log.flush.interval.messages=10000
    log.flush.interval.ms=1000
    
  • Для ключа log.retention.hours задается время хранения партиций в часах, по истечении которого партиция будет доступна для удаления. В данном случае отсчет будет вестись по самому новому сообщению в партиции.

    Пример

    log.retention.hours=168

  • Для ключа log.cleaner.enable задается значение true для активации механизма удаления старых партиций.

    Пример

    log.cleaner.enable=true

Запуск узлов кластера Platform V Corax#

Выполнив вышеуказанные настройки, можно запускать узлы кластера Platform V Corax.

Отдельные узлы можно запускать в любое время, т. е. запуск отдельного узла сервера Platform V Corax не зависит от остальных, как это было с кластером Zookeeper.

Запуск каждого отдельного сервера Platform V Corax выполняется командой:

bin/kafka-server-start.sh config/server.properties

При запуске всех узлов кластера Platform V Corax в консолях запуска должны быть следующие записи:

INFO [Kafka Server ID], started (kafka.server.KafkaServer)

Где ID — идентификатор сервера Platform V Corax, указанный в ключе broker.id.

Сетевые настройки#

Название

Описание

Тип

Значение по умолчанию

message.max.bytes

Максимальный размер сообщений, принимаемых сервером. Пример: message.max.bytes=1000012

int

1000012

num.network.threads

Количество сетевых потоков, используемых сервером для обработки сетевых запросов. Пример: num.network.threads=3

int

3

queued.max.requests

Лимит запросов. Если лимит превышен, начнется блокировка сетевых запросов. Пример: message.max.bytes=500

int

500

replica.socket.receive.buffer.bytes

Размер (в байтах) буфера сокета для приема сетевых запросов. Пример: replica.socket.receive.buffer.bytes=65536

int

65536

replica.socket.timeout.ms

Тайм-аут (в миллисекундах) сокета для сетевых запросов. Пример: replica.socket.timeout.ms=102400

int

30000

request.timeout.ms

Максимальное количество времени, которое уходит на ожидание клиентом ответа на отправленный запрос. Если в течение этого времени ответ на запрос не получен, клиент отправит повторный запрос. Если лимит повторных запросов превышен, то запросы будут прекращены. Пример: request.timeout.ms=102400

int

30000

socket.receive.buffer.bytes

Размер буфера в байтах на прием данных через сокет. Пример: socket.receive.buffer.bytes=102400

int

102400

socket.request.max. bytes

Максимальное количество байтов на сокет-запрос. Пример: socket.request.max.bytes=104857600

int

104857600

socket.send.buffer.bytes

Размер буфера в байтах на передачу данных через сокет. Пример: socket.send.buffer.bytes =102400

int

102400

connections.max.idle.ms

Тайм-аут неудачных попыток соединения в миллисекундах. Пример: connections.max.idle.ms=60000

long

60000

controller.socket.timeout.ms

Тайм-аут сокета в миллисекундах для Controller-to-Broker каналов. Пример: controller.socket.timeout.ms=30000

int

30000

max.connections.per.ip

Максимальное количество соединений с каждого IP-адреса. Пример: max.connections.per.ip=2147483647

int

2147483647

max.connections.per.ip.overrides

Настройка, позволяющая переопределять заданное максимальное количество соединений max.connections.per.ip, варьируя его для каждого IP-адреса. Пример: max.connections.per.ip.overrides=10.116.93.87:5,10.116.93.88:,10.116.97.221:45

string

""

Настройки подключения к Zookeeper#

Запуск Platform V Corax невозможен без подключения к Zookeeper.

Для подключения Platform V Corax-брокеров к кластеру Zookeeper в файле конфигураций server.properties каждого Platform V Corax-брокера в свойство zookeeper.connect необходимо прописать список всех хостов Zookeeper-брокеров в формате IP:PORT, разделяя их запятыми. Где:

  • IP — адрес хоста, на котором работает Zookeeper-брокер;

  • PORT — порт (clientPort в файле zookeeper.properties) для подключения клиентов Zookeeper — Platform V Corax-брокеров.

Для всех остальных настроек выставятся значения по умолчанию, если они не будут указаны явно в файле конфигураций. Список всех известных настроек Zookeeper, относящихся к файлу конфигураций server.properties Platform V Corax-брокера, и их значений по умолчанию приведен в таблице ниже.

Название

Описание

Тип

Значение по умолчанию

zookeeper.connect

Список адресов Zookeeper-брокеров в формате IP:PORT, где: IP — адрес хоста Zookeeper; PORT — порт клиентов Zookeeper (clientPort). Пример: zookeeper.connect=10.116.93.87:3181, 10.116.93.88:3181,10.116.97.221:3181

string

""

zookeeper.connection.timeout.ms

Максимальное время ожидания клиентом соединения с Zookeeper. Если это значение не выставлено, то используется zookeeper.session.timeout.ms. Пример: zookeeper.connection.timeout.ms=6000

int

null

zookeeper.session.timeout.ms

Тайм-аут сессии Zookeeper в миллисекундах. Пример: zookeeper.session.timeout.ms=6000

int

6000

zookeeper.set.acl

Принудить клиента использовать безопасные ACL (Access Control Lists — списки контроля доступа). Пример: zookeeper.set.acl=false

boolean

false

zookeeper.sync.time.ms

Продолжительность времени в миллисекундах, через которое происходит синхронизация Zookeeper-последователя с Zookeeper-лидером. Пример: zookeeper.sync.time.ms=2000

int

2000

Настройки диска#

Название

Описание

Тип

Значение по умолчанию

log.dir

Одна директория, в которой хранятся данные Platform V Corax — логи сообщений. В случае если в файле конфигураций не был указан путь хранения данных, Platform V Corax разместит логи в /tmp/kafka-logs. Пример: log.dir=/tmp/kafka-logs

string

/tmp/kafka-logs

log.dirs

Несколько директорий, в которых хранятся данные Platform V Corax — логи сообщений. Это свойство является дополнением к свойству log.dirs. Если планируется использовать Platform V Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории. Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Platform V Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую. Пример: log.dirs=/var/log/kafka/kafka-logs,/another\disk/kafka/kafka-logs

string

null

log.flush.interval.messages

Интервал, выражаемый количеством сообщений, накапливающихся в логе партиции до момента сохранения этого лога на диск. Если в логе накопилось сообщений меньше, чем задано в этом свойстве, то в случае сбоя (например, перезапуска ОС) эти сообщения будут потеряны на данной машине, поскольку хранились в оперативной памяти и не были сохранены на диск. Пример: log.flush.interval.messages=10000

long

9223372036854775807

log.flush.interval.ms

Максимальное время (в миллисекундах) хранения сообщения любого топика в оперативной памяти до того, как оно будет сохранено на диск. Если свойство не задано, будет использоваться значение из другого свойства — log.flush.scheduler.interval.ms. Пример: log.flush.interval.ms=1000

long

null

log.flush.offset.checkpoint.interval.ms

Частота, с которой происходит обновление персистентной записи, сохраненной в ходе последней сессии записи на диск. Эта частота служит отправной точкой восстановления лога. Пример: log.flush.offset.checkpoint.interval.ms=10

int

60000

log.flush.scheduler.interval.ms

Частота в миллисекундах, с которой данные каждого лога сохраняются на диск. Пример: log.flush.scheduler.interval.ms=10

long

9223372036854775807

log.retention.bytes

Максимальный размер лога перед его удалением. Пример: log.retention.bytes =100

long

-1

log.retention.hours

Время (в часах) хранения лог-файла до его удаления. Пример: log.retention.hours=168

int

168

log.retention.minutes

Время (в минутах) хранения лог-файла до его удаления. Если это свойство не задано, будет использоваться значение из log.retention.hours. Пример: log.retention.minutes=100000

int

null

log.retention.ms

Время (в миллисекундах) хранения лог-файла до его удаления (третье по важности после свойства log.retention.minutes). Если это свойство не задано, будет использоваться значение из log.retention.minutes. Пример: log.retention.ms=6000

long

null

log.roll.hours

Циркуляция логов (в часах) — максимальное количество времени до записи нового сегмента лога. Это свойство имеет более низкий приоритет, чем аналогичное свойство log.roll.ms. Пример: log.roll.hours=10

int

168

log.roll.jitter.hours

Максимальный джиттер (случайные отклонения для циркуляции логов) (в часах) для вычитания из значения logRollTimeMillis (в миллисекундах). Вторично по отношению к свойству log.roll.jitter.ms. Пример: log.roll.jitter.hours=1

int

0

log.roll.ms

Циркуляция логов (в миллисекундах) — максимальное количество времени до записи нового сегмента лога. Если свойство не задано, используется значение из log.roll.hours. Пример: log.roll.ms=10

long

null

log.segment.bytes

Максимальный размер одного лог-файла. Пример: log.segment.bytes=250

int

1073741824

log.segment.delete.delay.ms

Количество времени (в миллисекундах) до удаления файла из файловой системы. Пример: log.roll.delete.delay.ms=10

long

60000

log.cleaner.backoff.ms

Количество времени на простаивание до появления логов для удаления. Пример: log.cleaner.backoff.ms=10

long

15000

log.cleaner.dedupe.buffer.size

Общий размер памяти, выделяемой для удаления дубликатов во всех потоках службы «Log Cleaner». Пример: log.cleaner.dedupe.buffer.size=10

long

134217728

log.cleaner.delete.retention.ms

Количество времени сохранения удаленных записей. Пример: log.cleaner.delete.retention.ms=10

long

86400000

log.cleaner.enable

Включение службы «Log Cleaner». Должно иметь значение true, если есть какой-либо топик со свойством cleanup.policy=compact, включая топик с внутренними оффсетами. Если же значение установлено в false, то эти топики не будут компактными и будут постоянно увеличиваться в размере. Пример: log.cleaner.enable=true

boolean

true

log.cleaner.io.buffer.load.factor

Коэффициент загрузки буфера при удалении дубликатов процессом «Log Cleaner». Например, при коэффициенте, равном 0.9, степень загрузки буфера — 90%. Чем больше значение, тем большее количество логов будет удаляться за один раз, однако вместе с этим будет возрастать и число хеш-коллизий. Пример: log.cleaner.io.buffer.load.factor=0.9

double

0.9

log.cleaner.io.buffer.size

Общий объем памяти, используемый службой «Log Cleaner» на операции ввода/вывода для всех потоков этой службы. Пример: log.cleaner.io.buffer.size=255

int

524288

log.cleaner.io.max.bytes.per.second

Ограничение операций ввода/вывода службы «Log Cleaner» так, чтобы сумма байтов при операциях ввода и вывода была всегда меньше значения, указанного в этом свойстве. Пример: log.cleaner.io.max.bytes.per.second=5.0

double

1.7976931348623157E308

log.cleaner.min.cleanable.ratio

Минимальное соотношение между «грязным» и общим логом для определения пригодности лога для удаления. Пример: log.cleaner.min.cleanable.ratio=0.5

double

0.5

log.cleaner.threads

Число фоновых потоков, используемых для чистки логов службой «Log Cleaner». Пример: log.cleaner.threads=1

int

1

log.cleanup.policy

Политика очистки сегментов логов, находящихся за пределами сохраняющего окна «retention window». Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: log.cleanup.policy=delete

string

delete

log.index.interval.bytes

Интервал (в байтах), через который прибавляется запись к индексу оффсета. Пример: log.index.interval.bytes=4096

int

4096

log.index.interval.bytes

Интервал (в байтах), через который прибавляется запись к индексу оффсета. Пример: log.index.interval.bytes=4096

int

4096

log.index.size.max.bytes

Максимальный размер индекса оффсета в байтах. Пример: log.index.size.max.bytes=10485760

int

10485760

log.preallocate

Требуется ли создавать файл при выделении нового сегмента. Для Kafka на Windows рекомендуется установить значение true. Пример: log.preallocate=false

boolean

false

log.retention.check.interval.ms

Частота в миллисекундах, с которой служба «Log Cleaner» проверяет каждый лог на необходимость его удаления. Пример: log.retention.check.interval.ms=300000

long

300000

Настройки топиков#

Название

Описание

Тип

Значение по умолчанию

cleanup.policy

Возможны два режима: delete — удаление лога, compact — сжатие лога. Пример: cleanup.policy=delete

log.cleanup.policy

delete

delete.retention.ms

log.cleaner.delete.retention.ms

86400000 (24 hours)

flush.messages

log.flush.interval.messages

None

flush.ms

log.flush.interval.messages

None

index.interval.bytes

log.flush.interval.ms

4096

max.message.bytes

message.max.bytes

1000000

min.cleanable.dirty.ratio

log.cleaner.min.cleanable.ratio

0.5

min.insync.replicas

min.insync.replicas

1

retention.bytes

log.retention.bytes

None

retention.ms

Максимальное время (в миллисекундах) хранения логов до того, как старые сегменты лога будут удалены для освобождения дискового пространства. Свойство работает только в том случае, если cleanup.policy=compact

log.retention.ms

7 days

segment.bytes

Размер лог-файла в байтах. Сохраняется и удаляется всегда по одному файлу за один раз

log.segment.bytes

1 GB

segment.index.bytes

Размер индекса, который отображает оффсеты на позиции в файлах

log.index.size.max.bytes

10 MB

segment.ms

log.roll.hours

7 days

segment.jitter.ms

log.roll.jitter.{ms,hours}

0

Конфигурирование кластера Platform V Corax#

Конфигурирование кластера серверов Platform V Corax, как и конфигурирование одного узла, осуществляется в файле server.properties на каждом сервере, который планируется включить в кластер.

Минимальная конфигурация кластера#

Пример файла конфигураций server.properties для кластера Platform V Corax из двух брокеров (приведен пример конфигурации для одного из брокеров):

broker.id=1

host.name=10.168.1.1

zookeeper.connect=10.168.1.1:3181,10.168.1.2:3181

log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

Шаги создания конфигурационного файла:

  1. Присвойте ключу broker.id необходимый числовой идентификатор.

    Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.

    Пример

    На первом сервере Platform V Corax:

    broker.id=1

    На втором сервере:

    broker.id=2

  2. Присвойте ключу host.name IP-адрес сервера.

    Пример

    host.name=10.168.1.1

    Если на одном физическом сервере планируется запускать несколько серверов Platform V Corax, то для каждого сервера необходимо также указать уникальный порт, заданный в ключе port. По умолчанию сервер Platform V Corax слушает порт 9092.

  3. Для ключа zookeeper.connect укажите IP и PORT сервиса Zookeeper в формате zookeeper.connect=IP:PORT. Если кластер Zookeeper запущен, то укажите адреса Zookeeper-сервисов через запятую:

    Пример

    zookeeper.connect=10.168.1.1:2181,10.168.1.2:2181

  4. Для ключа log.dirs задайте директорию, в которой планируется хранение сообщений, передаваемых в Platform V Corax.

    Пример

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

    Примечание

    Если планируется использовать Platform V Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.

    Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Platform V Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs

На данном этапе минимальная конфигурация узлов кластера Platform V Corax завершается. Если не существует каких-либо ошибок в конфигурации, а также Zookeeper сконфигурирован корректно и запущен, то этого вполне достаточно для успешного запуска и работы кластера Platform V Corax.

Расширенная конфигурация кластера#
Конфигурирование Производителя#

Минимальная конфигурация

Пример файла конфигураций producer.properties для одного брокера:

acks=1

bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102,
`hostname --f`:9103

Шаги создания конфигурационного файла:

  1. Присвойте ключу acks значение, указывающее, что сообщение отправлено:

    • 0 — сообщение расположено в TCP-буфере для отправки по сети;

    • 1 — сообщение успешно записано в партицию-лидер;

    • 1 (all) — сообщение успешно записано на все ISR (InSyncReplicas параметр на Kafka broker (server.properties))

    Пример

    acks=1

  2. Для ключа bootstrap.servers укажите IP:PORT сервиса Platform V Corax.

    Пример

    Если запущены 3 экземпляра Platform V Corax на том же сервере:

    bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103
    

    Допускается также указывать IP-адреса, полные доменные имена.

Расширенная конфигурация

Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.

Запуск каждого отдельного Производителя Platform V Corax выполняется командой:

./bin/kafka-console-producer --broker-list `hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103 --topic test1 --producer.config etc/kafka/producer.properties

Название

Описание

Тип

Значение по умолчанию

key.serializer

Класс сериализатора для ключа, реализующего org.apache.kafka.common.serialization.Serializerinterface

class

Неприменимо

value.serializer

Класс сериализатора для значения, реализующего org.apache.kafka.common.serialization.Serializerinterface

class

Неприменимо

acks

Количество подтверждений, которые производитель должен получить от лидера, прежде чем считать запрос завершенным. Это определяет долговечность записей, которые будут отправлены. Разрешены следующие настройки:
1) acks=0 — производитель не будет ждать подтверждения от сервера. Запись будет немедленно добавлена в буфер сокета и будет считаться отправленной. В этом случае нельзя гарантировать, что сервер получил запись, и конфигурация повторных попыток не вступит в силу (поскольку клиент обычно не знает о каких-либо сбоях). Смещение, заданное для каждой записи, всегда будет равно -1;
2) acks=1 — означает, что лидер запишет в свой локальный журнал, но ответит, не дожидаясь полного подтверждения от всех ISR (In-Sync Replicas — все реплики партиции, синхронизированные с лидером). Если лидер партиции завершит работу с ошибкой после подтверждения записи, но до того как реплицируются записи на ISR, запись будет потеряна;
3) acks=all — лидер будет ждать полного набора синхронизированных реплик (ISR), чтобы подтвердить запись. Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается в рабочем состоянии. Это самая сильная

string

1

bootstrap.servers

Список пар «хост: порт», используемых для установления начального соединения с кластером Platform V Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде: host1:port1, host2:port2, … Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (можно указать более одного, на случай, если указанный сервер не работает)

list

""

buffer.memory

Общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, производитель блокируется на время, заданное параметром max.block.ms, после чего он выдаст исключение. Этот параметр должен примерно соответствовать общей памяти, которую будет использовать производитель, но не является жесткой привязкой, поскольку не вся память, используемая производителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержания inflight-запросов

long

33554432

compression.type

Тип сжатия для всех данных, генерируемых производителем. Допустимые значения: none (без сжатия), gzip, snappy, lz4 или zstd. Сжатие полных партий данных, поэтому эффективность разбивки на порции данных также повлияет на степень сжатия (большее дозирование означает лучшее сжатие)

string

none

retries

Установка значения больше нуля будет означать для клиента переотправку любой записи, отправка которой завершилась предположительно временной ошибкой. Эта попытка не отличается от случая, когда клиент получает ошибку. Разрешение повторов (retries) без установки параметра max.in.flight.requests.per.connection=1 потенциально изменит порядок записей, т. к. если два пакета отправляются в один раздел и первый пакет вызывает сбой и повторяется, но второй успешно записан, то запись о втором пакете может появиться первой. Запросы на запись будут завершены ошибкой до того, как количество попыток будет исчерпано, а если тайм-аут настроен, delivery.timeout.msexpires будет достигнут до успешного подтверждения. Пользователи обычно предпочитают не указывать этот параметр, а использовать delivery.timeout.msexpires для управления поведением повторной попытки

int

2,15E+09

ssl.key.password

Пароль закрытого ключа в файле keystore. Это необязательно для клиента

password

null

ssl.keystore.location

Расположение закрытого ключа в файле keystore. Это необязательно для клиента

string

null

ssl.keystore.password

Пароль хранилища для файла хранилища ключей (keystore). Это необязательно для клиента и требуется, если используется параметр ssl.keystore.location

password

null

ssl.truststore.location

Расположение файла хранилища доверительных сертификатов (truststore)

string

null

ssl.truststore.password

Пароль для файла хранилища доверительных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, но проверка целостности отключена

password

null

batch.size

При отправке нескольких записей в один раздел производитель попытается объединить записи в меньшее количество запросов. Это повышает производительность как на клиенте, так и на сервере. Этот параметр управляет размером пакета по умолчанию в байтах. Не будет предпринята попытка объединения записей в пакеты большего размера. Запросы, отправленные брокерам, будут содержать несколько пакетов, по одному для каждой партиции с доступными для отправки данными. Небольшой размер пакета сделает дозирование менее распространенным и может уменьшить пропускную способность (нулевой размер пакета полностью отключит дозирование). Очень большой размер пакета может использовать память немного более расточительно, так как всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей

int

16384

client.dns.lookup

Параметр управляет тем, как клиент использует DNS lookup. Если установлено значение use\all\dns\ips, то, когда поиск возвращает несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверам. Если установлено значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена и расширена в список канонических имен

string

default

client.id

Строка id для передачи серверу при выполнении запросов. Позволяет отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера

string

""

connections.max.idle.ms

Закрытие неактивных соединений после количества миллисекунд, указанных для этого параметра

long

540000

delivery.timeout.ms

Верхняя граница времени для отчета об успехе или сбое после получения ответа на вызов send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (acks) (если ожидается) и время, разрешенное для повторных отправок при сбоях. Producer может сообщить об ошибке отправки записи раньше, чем это обозначено в конфигурации, если обнаружена неустранимая ошибка, количество повторных попыток отправки было исчерпано или запись добавлена в пакет, который достиг окончания срока доставки (deadline) раньше. Значение этой конфигурации должно быть больше или равно сумме request.timeout.ms + linger.ms

int

120000

linger.ms

Производитель группирует все записи, поступающие между запросами передачи, в один пакетированный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако в некоторых случаях клиенту может понадобиться уменьшить количество запросов даже при умеренной нагрузке. Рассматриваемая настройка позволяет это сделать, добавляя небольшое количество искусственной задержки, то есть вместо немедленной отправки записи производитель будет ждать до заданной задержки, чтобы разрешить отправку других записей и отправить их вместе. Это можно рассматривать как аналог алгоритма Нэгла в TCP. Параметр определяет верхнюю границу задержки для дозирования: как только получен batch.size записей для партиции, они будут отправлены немедленно независимо от этого параметра, однако если количество байтов меньше, чем накопленных для этого раздела, будет выполняться задержка в течение указанного времени, ожидая появления большего количества записей. Этот параметр по умолчанию равен 0 (т. е. без задержки). Установка linger.ms=5, например, будет иметь эффект уменьшения количества отправленных запросов, но добавит до 5 мс задержки к записям, отправленным в отсутствие нагрузки

int

0

max.block.ms

Параметр определяет, как долго KafkaProducer.send() и KafkaProducer.partitionsFor() будет блокировать. Эти методы могут быть заблокированы, так как буфер заполнен или метаданные недоступны. Блокировка в пользовательских сериализаторах или разделителе не будет учитываться в течение этого времени ожидания

long

60000

max.request.size

Максимальный размер запроса в байтах. Этот параметр ограничивает количество партий записей, которые производитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. Это также эффективно ограничивает максимальный размер пакета записи. Обратите внимание, что сервер имеет свой собственный размер пакета записи, который может отличаться от заданного в данном параметре

int

1048576

partitioner.class

Класс разделителя, реализующий org.apache.kafka.clients.producer.Partitioner-интерфейс

class

org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes

Размер буфера приема TCP (SO_RCVBUF ) для использования при чтении данных. Если значение равно -1, будет использоваться ОС по умолчанию

int

32768

request.timeout.ms

Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны. Значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера) для уменьшения возможности дублирования сообщений из-за ненужных попыток производителя

int

30000

sasl.client.callback.handler.class

Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler

class

null

sasl.jaas.config

Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: loginModuleClass controlFlag (optionName=optionValue)\*;. Для брокеров конфигурация должна иметь префикс слушателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required

password

null

sasl.kerberos.service.name

Имя участника (принципала) Kerberos, под которым работает Кафка. Это можно определить либо в конфигурации JAAS Кафки, либо в конфигурации Кафки

string

null

sasl.login.callback.handler.class

Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс AuthenticateCallbackHandler. Для брокеров конфигурация обработчика обратного вызова входа (login callback handler) должна иметь префикс прослушивателя (listener) и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class

null

sasl.login.class

Полное имя класса, который определяет интерфейс входа. Для брокеров конфигурация входа должна быть с префиксом listener'а и именем механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin

class

null

sasl.mechanism

Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого доступен поставщик безопасности. GSSAPI является механизмом по умолчанию

string

GSSAPI

security.protocol

Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL

string

PLAINTEXT

send.buffer.bytes

Размер буфера отправки TCP (SO_SNDBUF) для использования при отправке данных. Если значение равно -1, будет использоваться ОС по умолчанию

int

131072

ssl.enabled.protocols

Список протоколов, разрешенных для SSL-соединений

list

TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type

Формат файла хранилища ключей (key store). Параметр необязателен для клиента

string

JKS

ssl.protocol

Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимые значения: TLSv1.1 и TLSv1.2. SSL, SSLv2 и SSLv3 могут поддерживаться в старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности

string

TLS

ssl.provider

Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию — значение, установленное по умолчанию в JVM

string

null

ssl.truststore.type

Формат файла хранилища доверия (trust store)

string

JKS

enable.idempotence

Если задано значение true, производитель гарантирует, что в поток записывается ровно одна копия каждого сообщения. Если false, производитель повторяет попытку из-за сбоев брокера и т. д., может писать дубликаты повторенного сообщения в потоке. Обратите внимание, что для включения идемпотенции требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество попыток переотправки (retries) было больше 0, а ack=all. Если эти значения явно не заданы пользователем, будут выбраны подходящие значения. Если установлены несовместимые значения, будет создано исключение ConfigException

boolean

false

interceptor.classes

Список классов для использования в качестве перехватчиков. Реализовывает org.apache.kafka.clients.producer.ProducerInterceptor-интерфейс, позволяющий перехватывать (и, возможно, изменять) записи, полученные производителем до их публикации в кластере Кафки. По умолчанию перехватчиков нет

list

""

max.in.flight.requests.per.connection

Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если этот параметр больше 1 и есть неудачные отправки, существует риск переупорядочения сообщений из-за повторных попыток (т. е. если повторные попытки включены)

int

5

metadata.max.age.ms

Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений в руководстве разделов, чтобы проактивно обнаружить новые брокеры или разделы

long

300000

metric.reporters

Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporter интерфейса позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX

list

""

metrics.num.samples

Количество выборок, сохраняемых для вычисления метрик

int

2

metrics.recording.level

Самый высокий уровень записи для метрик

string

INFO

metrics.sample.window.ms

Окно времени, в котором вычисляется выборка метрик

long

30000

reconnect.backoff.max.ms

Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного дрожания, чтобы избежать штормов соединения

long

1000

reconnect.backoff.ms

Базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру

long

50

retry.backoff.ms

Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя

long

100

sasl.kerberos.kinit.cmd

Путь к команде Kerberos kinit

string

/usr/bin/kinit

sasl.kerberos.min.time.before.relogin

Время сна потока входа между попытками обновления

long

60000

sasl.kerberos.ticket.renew.jitter

Процент случайного jitter'а, добавленного к времени обновления

double

0.05

sasl.kerberos.ticket.renew.window.factor

Поток входа будет спать до тех пор, пока не будет достигнут указанный коэффициент времени окна от последнего обновления до истечения срока действия билета, после чего он попытается обновить билет

double

0.8

sasl.login.refresh.buffer.seconds

Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER

short

300

sasl.login.refresh.min.period.seconds

Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER

short

60

sasl.login.refresh.window.factor

Поток обновления входа будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%). В настоящее время применяется только к носителю OAUTHBEARER

double

0.8

sasl.login.refresh.window.jitter

Максимальное количество случайного дрожания относительно времени жизни учетных данных, добавленного ко времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно. Значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER

double

0.05

ssl.cipher.suites

Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров

list

null

ssl.endpoint.identification.algorithm

Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера

string

https

ssl.keymanager.algorithm

Алгоритм, используемый фабрикой key manager для SSL-соединений. Значение по умолчанию — это алгоритм фабрики Key manager, настроенный для виртуальной машины Java

string

SunX509

ssl.secure.random.implementation

Реализация SecureRandom PRNG, используемая для операций шифрования SSL

string

null

ssl.trustmanager.algorithm

Алгоритм, используемый фабрикой trust manager для SSL-соединений. Значение по умолчанию — это заводской алгоритм trust manager, настроенный для виртуальной машины Java

string

PKIX

transaction.timeout.ms

Максимальное время в миллисекундах, в течение которого координатор транзакций будет ждать обновления статуса транзакции от производителя, прежде чем проактивно прервать текущую транзакцию. Если это значение больше, чем transaction.max.timeout.ms в брокере, запрос завершится ошибкой InvalidTransactionTimeout

int

60000

transactional.id

Идентификатор транзакции, используемый для доставки транзакций. Это включает семантику надежности, которая охватывает несколько сеансов производителя, так как это позволяет клиенту гарантировать, что транзакции с использованием того же TransactionalId были завершены до начала любых новых транзакций. Если идентификатор транзакции не указан, производитель ограничивается идемпотентной доставкой. Обратите внимание, что enable.idempotencemust должна быть включена, если настроен идентификатор транзакции. Значение по умолчанию равно null, что означает невозможность использования транзакций. По умолчанию для транзакций требуется кластер по крайней мере из трех брокеров, что является рекомендуемым параметром для производства; для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor

string

null

Конфигурирование Потребителя#

Минимальная конфигурация#

Пример файла конфигураций consumer.properties для одного брокера:

group.id=test-consumer-group

bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102,
`hostname --f`:9103

Шаги создания конфигурационного файла:

  1. Присвойте ключу group.id необходимый идентификатор группы, под которым будет запущен consumer. Можно запускать несколько consumer'ов под одним и тем же group.id, что позволит считывать данные в несколько потоков (с нескольких серверов), обеспечивая отсутствие дублирования вычитываемых данных.

    Пример

    group.id=test-consumer-group

  2. Для ключа bootstrap.servers укажите IP:PORT сервиса Platform V Corax.

    Пример

    Если запущены 3 экземпляра Platform V Corax на том же сервере:

    bootstrap.servers=`hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103
    

    Допускается также указывать IP-адреса, полные доменные имена.

Расширенная конфигурация#

Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.

Запуск каждого отдельного Потребителя Platform V Corax выполняется командой:

./bin/kafka-console-consumer --broker-list `hostname --f`:9101, `hostname --f`:9102, `hostname --f`:9103 --topic test1 --consumer.config etc/kafka/consumer.properties --from-beginning

Название

Описание

Тип

Значение по умолчанию

key.deserializer

Класс десериализатора для ключа, реализующего org.apache.kafka.common.serialization.Deserializerinterface

class

value.deserializer

Класс десериализатора для значения, реализующего org.apache.kafka.common.serializationDeserializerinterface

class

bootstrap.servers

Список пар «хост: порт», используемых для установления начального соединения с кластером Platform V Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде host1: port1, host2:port2,… Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (вы можете указать более одного, на случай, если указанный сервер не работает)

list

""

fetch.min.bytes

Минимальный объем данных, который сервер должен вернуть для запроса на выборку (fetch request). Если данных недостаточно, запрос будет ждать, пока накопится необходимое количество данных, прежде чем ответить на запрос. Значение по умолчанию 1 байт означает, что запросы на выборку отвечают, как только доступен один байт данных или время ожидания запроса на выборку данных заканчивается. Установка этого значения больше 1 заставит сервер ждать накопления больших объемов данных, что может немного повысить пропускную способность сервера за счет некоторой дополнительной задержки

int

1

group.id

Уникальная строка, идентифицирующая группу потребителей, к которой принадлежит данный потребитель. Это свойство является обязательным, если потребитель использует функции управления группами с помощью subscribe (topic) или стратегии управления смещением на основе Platform V Corax

string

null

heartbeat.interval.ms

Ожидаемое время между heartbeats (передачей сигнала «я жив») координатору потребителя при использовании средств управления группы Кафки. Передача сигнала используется для обеспечения активного сеанса потребителя и облегчения перебалансировки, когда новые потребители присоединяются или покидают группу. Значение должно быть установлено ниже session.timeout.ms, однако обычно должно устанавливается не выше 1/3 от session.timeout.ms. Его можно уменьшать с целью большего контроля предполагаемого времени для нормальных перебалансировок

int

3000

max.partition.fetch.bytes

Максимальный объем данных на партицию, который будет возвращен сервером. Записи извлекаются потребителем пакетами (batch). Если первый пакет записей в первом непустом разделе выборки больше этого предела, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может добиться успеха по извлечению данных. Максимальный размер пакета записей, принятый брокером, определяется посредством message.max.bytes (настройки брокеров) или max.message.bytes (настройки топиков) См. fetch.max.bytes для ограничения размера запроса потребителя

int

1048576

session.timeout.ms

Тайм-аут, используемый для обнаружения сбоев потребителей при использовании средства управления группами Platform V Corax. Потребитель посылает периодические heartbeats, чтобы сообщить брокеру что он жив (не завис). Если до истечения этого тайм-аута брокер не получит heartbeat, то брокер удалит этого потребителя из группы и инициирует перебалансировку. Значение должно находиться в допустимом диапазоне, настроенном в конфигурации брокера group.min.session.timeout.ms и group.max.session.timeout.ms

int

10000

ssl.key.password

Пароль закрытого ключа в файле keystore. Необязательный параметр для клиента

password

null

ssl.keystore.location

Расположение файла хранилища ключей keystore. Необязательный параметр для клиента и может использоваться для двусторонней проверки подлинности (authentication)

string

null

ssl.keystore.password

Пароль хранилища для файла хранилища ключей keystore. Необязательный параметр для клиента и требуется только в случае, если объявлено ssl.keystore.location

password

null

ssl.truststore.location

Расположение файла хранилища доверенных сертификатов (truststore)

string

null

ssl.truststore.password

Пароль для файла хранилища доверенных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, однако проверка целостности отключена

password

null

auto.offset.reset

Определяет действие, выполняемое, если в Platform V Corax нет начального смещения или если текущее смещение больше не существует на сервере (например, потому что эти данные были удалены): 1) earliest — автоматический сброс смещения до самого раннего смещения; 2) latest — автоматический сброс смещения до последнего смещения; 3) none — исключение для потребителя, если для группы потребителя не найдено предыдущего смещения; 4) иное — вызов исключения потребителю

string

latest

client.dns.lookup

Управляет тем, как клиент использует DNS-запросы. 1) Если установлено значение use\all\dns\ips, то при поиске возвращаются несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверов. 2) Если значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена (resolved) и расширена (expand) в список канонических имен

string

default

connections.max.idle.ms

Закрытие незанятых соединений после указанного количества миллисекунд

long

540000

default.api.timeout.ms

Задает время ожидания (в миллисекундах) для API-интерфейсов потребителя, которые могут блокировать. Эта конфигурация используется в качестве тайм-аута по умолчанию для всех операций потребителя, которые явно не принимают timeoutparameter

int

60000

enable.auto.commit

Если установлено значение true, смещение потребителя будет периодически фиксироваться в фоновом режиме

boolean

true

exclude.internal.topics

Следует ли предоставлять потребителю записи из внутренних (системных) топиков (например, топик смещений). Если установлено значение true, единственным способом получения записей из внутренних (системных) топиков является подписка на них

boolean

true

fetch.max.bytes

Максимальный объем данных, который сервер должен вернуть в ответ на запрос по выборке. Записи извлекаются потребителем пакетами, и если первый пакет записей в первом непустом разделе выборки больше этого значения, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель работоспособен и может извлекать данные (есть все разрешения и д.р.). Таким образом, это не абсолютный максимум. Максимальный размер пакета записей, принятый брокером, определяется посредством message.max.bytes (настройки брокеров) или max.message.bytes (настройки топиков). Обратите внимание, что потребитель выполняет несколько выборок параллельно

int

52428800

isolation.level

Управляет чтением сообщений, написанных транзакционно. Если установлено значение read\committed, consumer.poll () будет возвращать только транзакционные сообщения, которые были зафиксированы. Если установлено значение read\uncommitted' (по умолчанию), consumer.poll () вернет все сообщения, даже транзакционные сообщения, которые были прерваны. Нетранзакционные сообщения будут возвращены безоговорочно в любом режиме. Сообщения всегда возвращаются в порядке смещения. Следовательно, в режиме read\committed consumer.poll () будет возвращать сообщения только до последнего стабильного смещения (LSO — LastStableOffset), которое меньше смещения первой открытой транзакции. В частности, любые сообщения, появляющиеся после сообщений, относящихся к текущим транзакциям, будут удерживаться до завершения соответствующей операции. В результате read\committed-потребители не смогут читать данные до hight watermark (высокого водяного знака), когда есть текущая транзакция (inflight). Кроме того, если установлено значение read\committed, метод seekToEnd вернет LSO.

string

read\uncommitted

max.poll.interval.ms

Максимальная задержка между вызовами poll () при использовании управления группами потребителей. Это накладывает ограничение на количество времени, которое потребитель может простаивать до получения других записей записей. Если poll () не вызывается до истечения этого тайм-аута, то потребитель считается неработоспособным и группа будет перебалансироваться, чтобы переназначить разделы другому члену

int

300000

max.poll.records

Максимальное

int

500

partition.assignment.strategy

Имя класса стратегии назначения разделов, которую клиент будет использовать для распределения партиций между экземплярами-потребителями при использовании управления группами

list

class org.apache.kafka.clients.consumer.RangeAssignor

receive.buffer.bytes

Размер буфера приема TCP (SO\RCVBUF), используемый при чтении данных. Если значение равно -1, по умолчанию будет использоваться размера буфера, установленный в ОС

int

65536

request.timeout.ms

Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны

int

30000

sasl.client.callback.handler.class

Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler

class

null

sasl.jaas.config

Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: 'loginModuleClass controlFlag (optionName=optionValue)*;'. Для брокеров конфигурация должна иметь префикс слушателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required

password

null

sasl.kerberos.service.name

Имя принципала Kerberos, под которым работает Кафка. Это можно определить либо в конфигурации JAAS Platform V Corax, либо в конфигурации Кафки

string

null

sasl.login.callback.handler.class

Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс обработчика AuthenticateCallbackHandler. Для брокеров конфигурация logincallbackhandler должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class

null

sasl.login.class

Полное имя класса, реализующего интерфейс входа в систему. Для брокеров конфигурация входа должна иметь префикс прослушивателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin

class

null

sasl.mechanism

Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого поставщик безопасности доступен. GSSAPI является механизмом по умолчанию

string

GSSAPI

security.protocol

Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL

string

PLAINTEXT

send.buffer.bytes

Размер буфера отправки TCP (SO\SNDBUF), используемый при отправке данных. Если значение равно -1, по умолчанию будет использоваться параметр, установленный в ОС

int

131072

ssl.enabled.protocols

Список протоколов, разрешенных для SSL-соединений

list

TLSv1.2,TLSv1.1,TLSv1

ssl.keystore.type

Формат файла хранилища ключей. Параметр необязателен для клиента

string

JKS

ssl.protocol

Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимыми значениями в современных JVM являются TLS, TLSv1.1 и TLSv1.2. SSL, SSLv2 и SSLv3 могут поддерживаться в более старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности

string

TLS

ssl.provider

Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию-поставщик безопасности по умолчанию для JVM

string

null

ssl.truststore.type

Формат файла хранилища доверительных сертификатов (truststore)

string

JKS

auto.commit.interval.ms

Частота в миллисекундах, с которой потребитель смещает offsets автоматически в Platform V Corax, если enable.auto.commit=true

int

5000

check.crcs

Автоматически проверять CRC32 потребляемых (вычитываемых) записей. Это гарантирует отсутствие повреждения сообщений в сети или на диске. Эта проверка добавляет некоторые накладные расходы, поэтому она может быть отключена в случаях, требующих экстремальной производительности

boolean

true

client.id

Строка id для передачи серверу при выполнении запросов. Цель этого состоит в том, чтобы иметь возможность отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в журнал запросов на стороне сервера

string

""

fetch.max.wait.ms

Максимальное количество времени, на которое сервер заблокируется перед ответом на запрос fetch, если данных недостаточно для немедленного удовлетворения требования fetch.min.bytes

int

500

interceptor.classes

Список классов для использования в качестве перехватчиков. Реализация интерфейса org.apache.kafka.clients.ConsumerInterceptor позволяет перехватывать (и, возможно, изменять) записи, полученные потребителем. По умолчанию перехватчиков нет

list

""

metadata.max.age.ms

Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений лидеров партиций, чтобы проактивно обнаружить новые брокеры или партиции

long

300000

metric.reporters

Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporterinterface позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX

list

""

metrics.num.samples

Количество выборок, сохраняемых для вычисления метрик

int

2

metrics.recording.level

Самый высокий уровень записи для метрик

string

INFO

metrics.sample.window.ms

Окно времени, в котором вычисляется выборка метрик

long

30000

reconnect.backoff.max.ms

Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, к которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного дрожания, чтобы избежать штормов соединения

long

1000

reconnect.backoff.ms

Базовое время ожидания перед попыткой повторного подключения к заданному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру

long

50

retry.backoff.ms

Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя

long

100

sasl.kerberos.kinit.cmd

Путь к команде Kerberos kinit

string

/usr/bin/kinit

sasl.kerberos.min.time.before.relogin

Время сна потока входа между попытками обновления

long

60000

sasl.kerberos.ticket.renew.jitter

Процент случайного дрожания, добавленного ко времени обновления

double

0.05

sasl.kerberos.ticket.renew.window.factor

Поток входа будет спать до тех пор, пока не будет достигнут указанный коэффициент времени окна от последнего обновления до истечения срока действия билета, после чего он попытается обновить билет

double

0.8

sasl.login.refresh.buffer.seconds

Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); если значение не указано, используется значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER

short

300

sasl.login.refresh.min.period.seconds

Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию — 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к OAUTHBEARER

short

60

sasl.login.refresh.window.factor

Поток обновления входа (login) будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%)

double

0.8

sasl.login.refresh.window.jitter

Максимальное количество случайного дрожания относительно времени жизни учетных данных, добавленного к времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно; значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER

double

0.05

ssl.cipher.suites

Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров

list

null

ssl.endpoint.identification.algorithm

Алгоритм идентификации конечной точки(endpoint) для проверки имени хоста сервера с помощью сертификата сервера

string

https

ssl.keymanager.algorithm

Алгоритм, используемый фабрикой менеджера ключей для SSL-соединений. Значение по умолчанию — это алгоритм key manager factory, настроенный для виртуальной машины Java

string

SunX509

ssl.secure.random.implementation

Реализация SecureRandom PRNG, используемая для операций шифрования SSL

string

null

Журналирование в Platform V Corax#

Для журналирования Platform V Corax использует Apache Log4j 2.

Основным логгером является org.apache.kafka.streams.

Для настройки уровней журналирования необходимо использовать log4j.properties:

log4j.rootLogger=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.kafka.streams=DEBUG
log4j.additivity.org.apache.kafka.streams=false

Аудит#

Описание плагина аудита, входящего в поставку Platform V Corax, а также алгоритма его работы содержится в документе «Детальная архитектура».

Для исключения события аудита из списка событий, отправляемых в систему аудита, необходимо указать его в соответствующей настройке для LoggerAppender.

Особенности настройки плагина аудита и брокера Platform V Corax#

Пример настройки логгера для файла log4j.properties:

log4j.appender.auditAppender=com.sbt.kafka.logging.SbtHttpAuditLoggingAppender ## Класс log-appender для отправки сообщений в систему аудита
log4j.appender.auditAppender.KafkaPropertiesPath=/path/to/kafka/server.properties ## Абсолютный путь до настроек брокера Platform V Corax
log4j.appender.auditAppender.AuditRestUrl=http://localhost:8089/audit  ## URL системы аудита, куда будут отправляться сообщения
log4j.appender.auditAppender.ModelRestUrl=http://localhost:8089/audit  ## URL системы аудита, куда будут отправляться СХЕМА сообщения
log4j.appender.auditAppender.IgnoredAuditEvents=ALTER_CONFIGS,DESCRIBE_CONFIGS ## Коды событий, которые не будут отправлены в систему аудита
log4j.appender.auditAppender.layout=org.apache.log4j.SimpleLayout ## Стандартный Layout логирования

log4j.logger.kafka.audit.logging=DEBUG, auditAppender ## Пакет, откуда формируются строки для Log-appender и отправки сообщений в систему аудита
log4j.additivity.kafka.audit.loggingr=false ## Настройка, исключающая дублирование информации по логированию

Внимание

Для логирования Zookeeper необходим отдельный файл настроек log4j.

Для корректной работы плагина аудита необходимо в файле настроек брокера (например, server.properties) задать дополнительную настройку audit.conf.path — путь к файлу с настройками плагина аудита.

В файле настроек плагина аудита (путь к файлу с настройками указывается в audit.conf.path) необходимо задать настройку system.id — идентификатор системы, которая отправляет события аудита в централизованную систему аудита (по умолчанию идентификатор имеет значение 'Apache Kafka').

Пример настройки записи событий аудита в файл#

Для тестирования функциональности аудита можно настроить отправку сообщений аудита в файл.

Пример настройки:

log4j.appender.auditFileAppender = org.apache.log4j.DailyRollingFileAppender
log4j.appender.auditFileAppender.DatePattern = '.'yyyy-MM-dd
log4j.appender.auditFileAppender.File = ${kafka.logs.dir}/kafka-audit.log
log4j.appender.auditFileAppender.layout = org.apache.log4j.PatternLayout
log4j.appender.auditFileAppender.layout.ConversionPattern = [%d] %p %m (%c)%n
#log4j.appender.auditAppender = com.sbt.kafka.logging.SbtHttpAuditLoggingAppender
#log4j.appender.auditAppender.KafkaPropertiesPath = config/server.properties
#log4j.appender.auditAppender.ModelRestUrl = https://localhost:8080/publish/metamodel
#log4j.appender.auditAppender.AuditRestUrl = https://localhost:8080/publish/event
#log4j.appender.auditAppender.IgnoredAuditEvents =
#log4j.appender.auditAppender.layout = org.apache.log4j.SimpleLayout
#log4j.logger.kafka.audit.logging = DEBUG, auditAppender, auditFileAppender
log4j.logger.kafka.audit.logging = DEBUG, auditFileAppender
log4j.additivity.kafka.audit.logging = False