Тестовый запуск кластера ZooKeeper (3 узла)#

Тестовый запуск кластера ZooKeeper из трех узлов и кластера Corax из трех узлов будет производиться на трех удаленных серверах:

Server-1: <host1-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]
Server-2: <host2-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]
Server-3: <host3-ip-address> [ 1 узел ZooKeeper, 1 Брокер Kafka ]

Дополнительная информация:

  • версия дистрибутива Corax: kafka\2.11-0.9.0.0;

  • версия ZooKeeper: zookeeper-3.4.6;

  • имя пользователя: mon99usr;

  • \$KAFKA_HOME: /home/mon99usr/kafka_2.11-0.9.0.0, где \$KAFKA_HOME — домашняя корневая директория дистрибутива Corax.

Конфигурация кластера ZooKeeper из трех узлов

Каждый узел располагается на отдельном сервере.

Для конфигурации кластера ZooKeeper выполните следующие действия:

  1. На каждом из серверов создайте директорию, где будут храниться данные ZooKeeper:

    mkdir --p /var/tmp/kafka/zookeeper-data/
    
  2. На каждом из серверов создайте файл myid в директории /var/tmp/kafka/zookeeper-data/, созданной на предыдущем шаге (1). Это уникальный идентификатор каждого узла ZooKeeper, запишите его в файл:

    • на сервере Server-1 в файл myid запишите 1:

      echo 1 > /var/tmp/kafka/zookeeper-data/myid
      cat /var/tmp/kafka/zookeeper-data/myid
      1
      
    • на сервере Server-2 в файл myid запишите 2:

      echo 2 > /var/tmp/kafka/zookeeper-data/myid
      cat /var/tmp/kafka/zookeeper-data/myid
      2
      
    • на сервере Server-3 в файл myid запишите 3:

      echo 3 > /var/tmp/kafka/zookeeper-data/myid
      cat /var/tmp/kafka/zookeeper-data/myid
      3
      
  3. Отредактируйте файл конфигурации ZooKeeper. KAFKA_DIR/config/zookeeper.properties для трех серверов одинаков. Укажите корректные значения для параметров:

    ###################[ MAIN ]###################
    dataDir=data/zookeeper-data
    clientPort=2181
    
    ##############[ CLUSTER MODE ON ]#############
    server.1=<host1-ip-address>:2888:3888
    server.2=<host2-ip-address>:2888:3888
    server.3=<host3-ip-address>:2888:3888
    tickTime=2000
    initLimit=10
    syncLimit=5
    

    Где:

    • dataDir — директория для хранения данных ZooKeeper;

    • clientPort — порт, к которому подключаются клиенты ZooKeeper. Для PLAINTEXT — 2181, для SSL — 2182.

    • server.1 — адрес Server-1; число 1 в server.1 — это myid сервера Server-1;

    • server.2 — адрес Server-2; число 2 в server.2 — это myid сервера Server-2;

    • server.3 — адрес Server-3; число 3 в Server.3 — это myid сервера Server-3.

Запуск кластера ZooKeeper из трех узлов

  1. Выполните запуск узла ZooKeeper на сервере Server-1 в фоновом режиме с выводом логов:

    bin/zookeeper-server-start.sh -daemon
    config/zookeeper.properties && tail -f logs/zookeeper.out
    [mon99usr\@SBT-IPO-203 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &&
    tail -f logs/zookeeper.out
    
    [2016-03-28 12:56:43,375] INFO Reading configuration from: config/zookeeper.properties
    
    [2016-03-28 12:56:43,377] INFO Defaulting to majority quorums
    
    [2016-03-28 12:56:43,382] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:56:43,382] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:56:43,382] INFO Purge task is not scheduled.
    
    [2016-03-28 12:56:43,411] INFO Starting quorum peer
    
    [2016-03-28 12:56:43,424] INFO binding to port 0.0.0.0/0.0.0.0:2181
    
    [2016-03-28 12:56:43,447] INFO tickTime set to 2000
    
    [2016-03-28 12:56:43,448] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:56:43,448] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:56:43,448] INFO initLimit set to 10
    
    [2016-03-28 12:56:43,467] INFO Reading snapshot /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac
    
    [2016-03-28 12:56:43,507] INFO My election bind port: /<host1-ip-address>:3888
    
    [2016-03-28 12:56:43,518] INFO LOOKING
    
    [2016-03-28 12:56:43,520] INFO New election. My id = 1, proposed zxid=0x310000003d
    
    [2016-03-28 12:56:43,522] INFO Notification: 1 (message format version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:43,530] WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888
    
    java.net.ConnectException: Connection refused
    
    [2016-03-28 12:56:43,534] WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888
    
    java.net.ConnectException: Connection refused
    
    [2016-03-28 12:56:56,388] INFO Received connection request /<host2-ip-address>:11287 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
    
    [2016-03-28 12:56:56,595] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:56:56,600] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:56:56,606] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:56:56,607] INFO Server environment:host.name=SBT-IPO-203.youre.adress.ru
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.version=1.8.0_60
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.home=/usr/java/jre1.8.0_60
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/../....
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.library.path=/usr/java/packages/...
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:56:56,607] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:56:56,607] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.home=/home/mon99usr
    
    [2016-03-28 12:56:56,607] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
    
    [2016-03-28 12:56:56,609] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 Datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:56:56,609] INFO FOLLOWING - LEADER ELECTION TOOK - 13090 (org.apache.zookeeper.server.quorum.Learner)
    
    [2016-03-28 12:56:56,616] WARN Unexpected exception, tries=0, connecting to /<host2-ip-address>:2888
    
    java.net.ConnectException: Connection refused
    
    ...
    
    [2016-03-28 12:56:57,640] INFO Getting a diff from the leader 0x310000003d
    
    [2016-03-28 12:56:57,645] INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d
    
    [2016-03-28 12:57:04,004] WARN Got zxid 0x3200000001 expected 0x1
    
    [2016-03-28 12:57:04,004] INFO Creating new log file: log.3200000001
    
    [2016-03-28 12:57:06,472] INFO Received connection request /<host3-ip-address>:47171
    
    [2016-03-28 12:57:06,475] INFO Notification: 1 (message format version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) FOLLOWING (my state)
    

    Где tail -f logs/zookeeper.out — вывод текущего лога ZooKeeper-брокера из файла в реальном времени.

    Запуск ZooKeeper-брокера на Server-1 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере <host2-ip-address> и с Брокером на сервере <host3-ip-address>, так как Брокеры на этих серверах еще не запущены:

      INFO LOOKING
      
      WARN Cannot open channel to 2 at election address /<host2-ip-address>:3888
      
      java.net.ConnectException: Connection refused
      
      WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888
      
      java.net.ConnectException: Connection refused
      
    • Принят запрос на соединение от Брокера на сервере Server-2 по адресу <host2-ip-address> в связи со запуском этого Брокера:

      INFO Received connection request /<host2-ip-address>:11287
      

      Как правило, после этого сообщения новый Брокер включается в кластер.

    • Выбран сервер-лидер — это только что запущенный Брокер на сервере Server-2. Брокер на текущем сервере Server-1 выбран в качестве сервера-последователя (FOLLOWER):

      INFO FOLLOWING
      
    • Синхронизация Брокера-последователя на сервере Server-1 с Брокером-лидером на сервере Server-2: получение от сервера-лидера Server-2 текущего состояния системы (diff). В результате на сервере Server-1 будет создан и сохранен соответствующий образ (snapshot) только что полученных данных:

      INFO Getting a diff from the leader 0x310000003d
      
      INFO Snapshotting: 0x310000003d to /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.310000003d
      
    • Принят запрос на соединение от Брокера на сервере Server-3 по адресу <host3-ip-address> в связи с запуском этого Брокера:

      INFO Received connection request /<host3-ip-address>:11287
      

      С этого момента новый Брокер включается в кластер.

    • После сообщения INFO Notification: 1... вывод логов прекращается в связи с отсутствием дальнейших событий.

  2. Выполните запуск узла ZooKeeper на сервере Server-2 в фоновом режиме с выводом логов:

    bin/zookeeper-server-start.sh -daemon
    config/zookeeper.properties && tail -f logs/zookeeper.out
    [mon99usr\@SBT-IPO-204 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &&
    tail -f logs/zookeeper.out
    
    [2016-03-28 12:56:56,055] INFO Reading configuration from:
    config/zookeeper.properties
    
    [2016-03-28 12:56:56,058] INFO Defaulting to majority quorums
    
    [2016-03-28 12:56:56,067] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:56:56,067] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:56:56,067] INFO Purge task is not scheduled.
    
    [2016-03-28 12:56:56,116] INFO Starting quorum peer
    
    [2016-03-28 12:56:56,135] INFO binding to port 0.0.0.0/0.0.0.0:2181
    
    [2016-03-28 12:56:56,184] INFO tickTime set to 2000
    
    [2016-03-28 12:56:56,185] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:56:56,185] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:56:56,185] INFO initLimit set to 10
    
    [2016-03-28 12:56:56,211] INFO Reading snapshot
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.2a000000a9
    
    [2016-03-28 12:56:56,375] INFO My election bind port:
    /<host2-ip-address>:3888
    
    [2016-03-28 12:56:56,378] INFO LOOKING
    (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:56:56,380] INFO New election. My id = 2, proposed zxid=0x310000003d
    
    [2016-03-28 12:56:56,394] INFO Notification: 1 (message format
    version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING
    (n.state), 2 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,396] WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888
    
    java.net.ConnectException: Connection refused
    
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    
    ...
    
    [2016-03-28 12:56:56,398] INFO Notification: 1 (message format
    version), 1 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,399] INFO Notification: 1 (message format
    version), 2 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:56:56,601] INFO LEADING
    
    [2016-03-28 12:56:56,605] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:56:56,615] INFO Server environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:56:56,615] INFO Server environment:host.name=SBT-IPO-204.youre.adress.ru
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.version=1.8.0_66
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.home=/usr/java/jre1.8.0_66
    
    [2016-03-28 12:56:56,615] INFO Server environment:java.class.path=:/home/mon99usr/kafka_2.11-0.9.0.0/bin/....
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.library.path=/usr/java/...
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:56:56,616] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:56:56,616] INFO Server environment:os.version=3.10.0-123.9.3.el7.x86_64
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.home=/home/mon99usr
    
    [2016-03-28 12:56:56,616] INFO Server environment:user.dir=/home/mon99usr/kafka_2.11-0.9.0.0
    
    [2016-03-28 12:56:56,618] INFO Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000
    
    datadir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:56:56,619] INFO LEADING - LEADER ELECTION TOOK - 239
    
    [2016-03-28 12:56:57,627] INFO Follower sid: 1 : info :
    org.apache.zookeeper.server.quorum.QuorumPeer\$...
    
    [2016-03-28 12:56:57,638] INFO Synchronizing with Follower sid: 1
    maxCommittedLog=0x310000003d minCommittedLog=0x2e0000006f
    peerLastZxid=0x310000003d
    
    [2016-03-28 12:56:57,638] INFO Sending DIFF
    
    [2016-03-28 12:56:57,666] INFO Received NEWLEADER-ACK message from 1
    
    [2016-03-28 12:56:57,669] INFO Have quorum of supporters, sids: [ 1,2]; starting up and setting last processed zxid: ..
    
    [2016-03-28 12:57:04,000] INFO Expiring session 0x253bc816e870000, timeout of 6000ms exceeded
    
    [2016-03-28 12:57:04,001] INFO Processed session termination for sessionid: 0x253bc816e870000
    
    [2016-03-28 12:57:04,002] INFO Creating new log file: log.3200000001
    
    [2016-03-28 12:57:06,475] INFO Received connection request
    /<host3-ip-address>:34164
    
    [2016-03-28 12:57:06,477] INFO Notification: 1 (message format
    version), 3 (n.leader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31 (n.peerEpoch) LEADING (my state)
    
    [2016-03-28 12:57:06,513] INFO Follower sid: 3 : info :
    org.apache.zookeeper.server.quorum.QuorumPeer\$QuorumServer@...
    
    [2016-03-28 12:57:06,517] INFO Synchronizing with Follower sid: 3
    maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070
    peerLastZxid=0x310000003d
    
    [2016-03-28 12:57:06,518] INFO Sending DIFF
    
    [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
    

    Запуск брокера на сервере Server-2 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, отказ соединения с Брокером на сервере <host3-ip-address>, так как Брокер на этом сервере еще не запущен:

      INFO LOOKING
      
      WARN Cannot open channel to 3 at election address /<host3-ip-address>:3888
      
      java.net.ConnectException: Connection refused
      
    • Лидером выбран Брокер на сервере Server-2 (на выбор лидера ушло 239 миллисекунд). От Брокера на сервере Server-1 получено соответствующее подтверждение. Выведено сообщение, что создан новый кворум [1,2] из двух источников-брокеров с идентификаторами myid = 1 и 2 соответственно.

      INFO LEADING
      
      INFO LEADING - LEADER ELECTION TOOK -- 239
      
      INFO Have quorum of supporters, sids: [ 1,2 ];
      
    • Синхронизация Брокера-последователя на сервере Server-1 с Брокером-лидером на сервере Server-2. Брокер-лидер отправляет текущее состояние системы (DIFF) и тем самым запускает синхронизацию. После обработки DIFF Брокером-последователем (сохранения DIFF в виде образа (snapshot) текущего состояния системы) от Брокера-последователя поступает соответствующее подтверждение о получении Received NEWLEADER-ACK:

      INFO Follower sid: 1 : info : org.apache.zookeeper.server.quorum.Q...
      
      INFO Synchronizing with Follower sid: 1
      
      INFO Sending DIFF
      
      INFO Received NEWLEADER-ACK message from 1
      
    • Принят запрос на соединение от нового Брокера на сервере Server-3 по адресу <host3-ip-address> в связи со запуском этого Брокера:

      INFO Received connection request /<host3-ip-address>:34164
      
    • Новый Брокер с идентификатором myid=3 принят в кворум в качестве Брокера-последователя:

      INFO Follower sid: 3
      
    • Синхронизация Брокера-последователя на сервере Server-3 с Брокером-лидером на сервере Server-2. Брокеру-последователю отправлено текущее состояние системы (DIFF), успешно обработано Брокером-последователем, в ответ выслано соответствующее подтверждение:

      INFO Synchronizing with Follower sid: 3 maxCommittedLog=0x3200000001 minCommittedLog=0x2e00000070 peerLastZxid=0x310000003d
      
      [2016-03-28 12:57:06,518] INFO Sending DIFF
      
      [2016-03-28 12:57:06,540] INFO Received NEWLEADER-ACK message from 3
      
  3. Выполните запуск узла ZooKeeper на сервере Server-3 в фоновом режиме с выводом логов:

    > bin/zookeeper-server-start.sh -daemon
    
    config/zookeeper.properties && tail -f logs/zookeeper.out
    
    [mon99usr\@SBT-IPO-208 kafka_2.11-0.9.0.0]\$
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties && tail -f logs/zookeeper.out
    
    [2016-03-28 12:57:07,273] INFO Reading configuration from:
    config/zookeeper.properties
    
    [2016-03-28 12:57:07,276] INFO Defaulting to majority quorums
    
    [2016-03-28 12:57:07,282] INFO autopurge.snapRetainCount set to 3
    
    [2016-03-28 12:57:07,282] INFO autopurge.purgeInterval set to 0
    
    [2016-03-28 12:57:07,282] INFO Purge task is not scheduled.
    
    [2016-03-28 12:57:07,303] INFO Starting quorum peer
    
    [2016-03-28 12:57:07,321] INFO binding to port 0.0.0.0/0.0.0.0:2181
    
    [2016-03-28 12:57:07,357] INFO tickTime set to 2000
    
    [2016-03-28 12:57:07,357] INFO minSessionTimeout set to -1
    
    [2016-03-28 12:57:07,357] INFO maxSessionTimeout set to -1
    
    [2016-03-28 12:57:07,357] INFO initLimit set to 10
    
    [2016-03-28 12:57:07,378] INFO Reading snapshot
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2/snapshot.30000000ac
    
    [2016-03-28 12:57:07,430] INFO My election bind port:
    /<host3-ip-address>:3888
    
    [2016-03-28 12:57:07,444] INFO LOOKING
    
    [2016-03-28 12:57:07,445] INFO New election. My id = 3, proposed zxid=0x31000003d
    
    [2016-03-28 12:57:07,459] INFO Notification: 1 (message format
    version), 3 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,460] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,460] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), FOLLOWING (n.state), 1 (n.sid), 0x2 (n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,461] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x31(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,462] INFO Notification: 1 (message format
    version), 2 (n.lader), 0x310000003d (n.zxid), 0x1 (n.round), LEADING (n.state), 2 (n.sid), 0x32(n.peerEpoch) LOOKING (my state)
    
    [2016-03-28 12:57:07,462] INFO FOLLOWING (org.apache.zookeeper.server.quorum.QuorumPeer)
    
    [2016-03-28 12:57:07,469] INFO TCP NoDelay set to: true
    
    [2016-03-28 12:57:07,478] INFO Server
    environment:zookeeper.version=3.4.6-156995, built on 02/20/2014 09:09 GMT
    
    [2016-03-28 12:57:07,478] INFO Server environment:host.name=SBT-IPO-208.youre.adress.ru
    
    [2016-03-28 12:57:07,487] INFO Server environment:java.version=1.8.0_71
    
    [2016-03-28 12:57:07,487] INFO Server
    environment:java.home=/usr/java/jre1.8.0_1
    
    [2016-03-28 12:57:07,487] INFO Server
    environment:java.class.path=:/home/mon99ur/...
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:java.library.path=/usr/java/packages/...
    
    [2016-03-28 12:57:07,488] INFO Server environment:java.io.tmpdir=/tmp
    
    [2016-03-28 12:57:07,488] INFO Server environment:java.compiler=\<NA\>
    
    [2016-03-28 12:57:07,488] INFO Server environment:os.name=Linux
    
    [2016-03-28 12:57:07,488] INFO Server environment:os.arch=amd64
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:os.version=3.10.0-123.9.3.el7x86_64
    
    [2016-03-28 12:57:07,488] INFO Server environment:user.name=mon99usr
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:user.home=/home/mon99usr
    
    [2016-03-28 12:57:07,488] INFO Server
    environment:user.dir=/home/mon99usr/kafka2.11-0.9.0.0
    
    [2016-03-28 12:57:07,490] INFO Created server with tickTime 2000
    minSessionTimeout 4000 maxSessionTimeout 40000 datadir
    
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2 snapdir
    /var/tmp/kafka_2.11-0.9.0.0/zookeeper-data/version-2
    
    [2016-03-28 12:57:07,491] INFO FOLLOWING - LEADER ELECTION TOOK - 46
    
    [2016-03-28 12:57:07,502] INFO Getting a diff from the leader
    0x3200000001
    
    [2016-03-28 12:57:07,506] WARN Got zxid 0x3200000001 expected 0x1
    
    [2016-03-28 12:57:07,508] INFO Snapshotting: 0x3200000001 to
    /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
    

    Запуск брокера на сервере Server-3 выполнен успешно.

    Краткий анализ логов

    • Поиск других Брокеров с целью выбора лидера, инициация алгоритма выбора лидера, предложение своей кандидатуры в качестве лидера аналогичны инициализации двух предыдущих Брокеров:

      INFO LOOKING
      
      INFO New election. My id = 3, proposed zxid=0x31000003d
      
    • Брокер на сервере Server-3 выбран в качестве Брокера-последователя:

      INFO FOLLOWING
      
      INFO FOLLOWING - LEADER ELECTION TOOK -- 46
      
    • Синхронизация с Брокером на сервере Server-2. Получение состояния системы и сохранение образа:

      INFO Getting a diff from the leader 0x3200000001
      
      INFO Snapshotting: 0x3200000001 to /var/tmp/kafka_2.1-0.9.0.0/zookeeper-data/version-2/snapshot.3200000001
      

Конфигурация кластера Corax из трех узлов

Каждый узел расположен на отдельном сервере.

В целях тестирования доступности сервиса ZooKeeper с различным количеством работающих узлов сконфигурируйте кластер Corax (в сценарии рассмотрена конфигурация кластера при установке Corax по профилю no-auth):

  1. Отредактируйте файл конфигурации KAFKA_DIR/config/server.properties для каждого Corax-брокера. Укажите корректные значения для broker.id и zookeeper.connect:

    • Для сервера Server-1:

      ##########################[ MAIN ]#########################
      broker.id=1
      log.dirs=data/kafka-data
      
      ########################[ ZOOKEEPER ]######################
      zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181
      zookeeper.connection.timeout.ms=6000
      zookeeper.session.timeout.ms=18000
      
      #####################[ AUTHENTICATION ]####################
      listeners=PLAINTEXT://<host1-ip-address>:9092
      
    • Для сервера Server-2:

      ##########################[ MAIN ]#########################
      broker.id=2
      log.dirs=data/kafka-data
      
      ########################[ ZOOKEEPER ]######################
      zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181
      zookeeper.connection.timeout.ms=6000
      zookeeper.session.timeout.ms=18000
      
      #####################[ AUTHENTICATION ]####################
      listeners=PLAINTEXT://<host2-ip-address>:9092
      
    • Для сервера Server-3:

      ##########################[ MAIN ]#########################
      broker.id=3
      log.dirs=data/kafka-data
      
      ########################[ ZOOKEEPER ]######################
      zookeeper.connect=<host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181
      zookeeper.connection.timeout.ms=6000
      zookeeper.session.timeout.ms=18000
      
      #####################[ AUTHENTICATION ]####################
      listeners=PLAINTEXT://<host3-ip-address>:9092
      

    Где:

    • broker.id — идентификатор Corax-брокера, обязательно должен быть уникальным в рамках кластера Corax;

    • log.dirs — директория, в которой хранятся логи Corax;

    • listeners — IP-адрес сервера, на котором находится Corax-брокер в формате PROTOCOL://IP:PORT:

      • PROTOCOL — протокол подключения к брокеру. Может принимать значения PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL;

      • IP — адрес сервера, на котором выполняется Corax-брокер;

      • PORT — порт, к которому подключаются клиенты Corax. Для PLAINTEXT — 9092, для SSL — 9093.

    • zookeeper.connect — список адресов ZooKeeper-брокеров в формате IP:PORT, где:

      • IP — адрес сервера, на котором выполняется ZooKeeper-брокер;

      • PORT — порт, к которому подключаются клиенты ZooKeeper. Для PLAINTEXT — 2181, для SSL — 2182.

Запуск кластера Corax из трех узлов

  1. Выполните запуск узла Corax на сервере Server-1 в фоновом режиме с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-203 kafka]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log
    
    [2016-03-28 17:16:50,514] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 11 milliseconds.
    
    [2016-03-28 17:16:50,531] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:16:50,532] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:16:50,537] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:16:50,549] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:16:50,555] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:16:50,556] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -> EndPoint(<host1-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:16:50,571] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,309] INFO KafkaConfig values:
    
    advertised.host.name = null
    
    metric.reporters = []
    
    quota.producer.default = 9223372036854775807
    
    offsets.topic.num.partitions = 50
    
    log.flush.interval.messages = 9223372036854775807
    
    auto.create.topics.enable = true
    
    controller.socket.timeout.ms = 30000
    
    log.flush.interval.ms = null
    
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    
    replica.socket.receive.buffer.bytes = 65536
    
    min.insync.replicas = 1
    
    replica.fetch.wait.max.ms = 500
    
    num.recovery.threads.per.data.dir = 1
    
    ssl.keystore.type = JKS
    
    default.replication.factor = 1
    
    ssl.truststore.password = null
    
    log.preallocate = false
    
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    
    fetch.purgatory.purge.interval.requests = 1000
    
    ssl.endpoint.identification.algorithm = null
    
    replica.socket.timeout.ms = 30000
    
    message.max.bytes = 1000012
    
    num.io.threads = 8
    
    offsets.commit.required.acks = -1
    
    log.flush.offset.checkpoint.interval.ms = 60000
    
    delete.topic.enable = false
    
    quota.window.size.seconds = 1
    
    ssl.truststore.type = JKS
    
    offsets.commit.timeout.ms = 5000
    
    quota.window.num = 11
    
    zookeeper.connect = <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181
    
    authorizer.class.name =
    
    num.replica.fetchers = 1
    
    log.retention.ms = null
    
    log.roll.jitter.hours = 0
    
    log.cleaner.enable = false
    
    offsets.load.buffer.size = 5242880
    
    log.cleaner.delete.retention.ms = 86400000
    
    ssl.client.auth = none
    
    controlled.shutdown.max.retries = 3
    
    queued.max.requests = 500
    
    offsets.topic.replication.factor = 3
    
    log.cleaner.threads = 1
    
    sasl.kerberos.service.name = null
    
    sasl.kerberos.ticket.renew.jitter = 0.05
    
    socket.request.max.bytes = 104857600
    
    ssl.trustmanager.algorithm = PKIX
    
    zookeeper.session.timeout.ms = 6000
    
    log.retention.bytes = -1
    
    sasl.kerberos.min.time.before.relogin = 60000
    
    zookeeper.set.acl = false
    
    connections.max.idle.ms = 600000
    
    offsets.retention.minutes = 1440
    
    replica.fetch.backoff.ms = 1000
    
    inter.broker.protocol.version = 0.9.0.X
    
    log.retention.hours = 168
    
    num.partitions = 1
    
    listeners = null
    
    ssl.provider = null
    
    ssl.enabled.protocols = [TLSv1.3, TLSv1.2]
    
    log.roll.ms = null
    
    log.flush.scheduler.interval.ms = 9223372036854775807
    
    ssl.cipher.suites = null
    
    log.index.size.max.bytes = 10485760
    
    ssl.keymanager.algorithm = SunX509
    
    security.inter.broker.protocol = PLAINTEXT
    
    replica.fetch.max.bytes = 1048576
    
    advertised.port = null
    
    log.cleaner.dedupe.buffer.size = 524288000
    
    replica.high.watermark.checkpoint.interval.ms = 5000
    
    log.cleaner.io.buffer.size = 524288
    
    sasl.kerberos.ticket.renew.window.factor = 0.8
    
    zookeeper.connection.timeout.ms = null
    
    controlled.shutdown.retry.backoff.ms = 5000
    
    log.roll.hours = 168
    
    log.cleanup.policy = delete
    
    host.name = <host1-ip-address>
    
    log.roll.jitter.ms = null
    
    max.connections.per.ip = 2147483647
    
    offsets.topic.segment.bytes = 104857600
    
    background.threads = 10
    
    quota.consumer.default = 9223372036854775807
    
    request.timeout.ms = 30000
    
    log.index.interval.bytes = 4096
    
    log.dir = /tmp/kafka-logs
    
    log.segment.bytes = 1073741824
    
    log.cleaner.backoff.ms = 15000
    
    offset.metadata.max.bytes = 4096
    
    ssl.truststore.location = null
    
    group.max.session.timeout.ms = 30000
    
    ssl.keystore.password = null
    
    zookeeper.sync.time.ms = 2000
    
    port = 9092
    
    log.retention.minutes = null
    
    log.segment.delete.delay.ms = 60000
    
    log.dirs = /var/tmp/kafka/kafka-logs
    
    controlled.shutdown.enable = true
    
    compression.type = producer
    
    max.connections.per.ip.overrides =
    
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    
    auto.leader.rebalance.enable = true
    
    leader.imbalance.check.interval.seconds = 300
    
    log.cleaner.min.cleanable.ratio = 0.5
    
    replica.lag.time.max.ms = 10000
    
    num.network.threads = 3
    
    ssl.key.password = null
    
    reserved.broker.max.id = 1000
    
    metrics.num.samples = 2
    
    socket.send.buffer.bytes = 102400
    
    ssl.protocol = TLS
    
    socket.receive.buffer.bytes = 102400
    
    ssl.keystore.location = null
    
    replica.fetch.min.bytes = 1
    
    unclean.leader.election.enable = true
    
    group.min.session.timeout.ms = 6000
    
    log.cleaner.io.buffer.load.factor = 0.9
    
    offsets.retention.check.interval.ms = 600000
    
    producer.purgatory.purge.interval.requests = 1000
    
    metrics.sample.window.ms = 30000
    
    broker.id = 1
    
    offsets.topic.compression.codec = 0
    
    log.retention.check.interval.ms = 300000
    
    advertised.listeners = null
    
    leader.imbalance.per.broker.percentage = 10
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:17:06,377] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,382] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:06,748] INFO Loading logs. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,986] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-0/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:17:06,987] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log)
    
    [2016-03-28 17:17:06,988] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log)
    
    [2016-03-28 17:17:06,990] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-1/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:17:06,991] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log)
    
    [2016-03-28 17:17:06,992] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log)
    
    [2016-03-28 17:17:06,994] INFO Logs loading complete. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,995] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:17:06,996] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:17:07,040] INFO Awaiting socket connections on SBT-IPO-203.youre.adress.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:17:07,043] INFO [Socket Server on Broker 1], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:17:07,062] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:07,063] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:07,124] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:07,142] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:07,142] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    
    [2016-03-28 17:17:09,020] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:17:09,051] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:17:09,053] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:09,053] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:17:09,057] INFO [ExpirationReaper-1], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:17:09,060] INFO [Group Metadata Manager on Broker 1]: Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:17:09,075] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:17:09,084] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:17:09,128] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:09,138] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:17:09,139] INFO Registered broker 1 at path /brokers/ids/1 with addresses: PLAINTEXT -\> EndPoint(<host1-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:17:09,151] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:17:09,409] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [NUMBERS,0],[NUMBERS,1] (kafka.server.ReplicaFetcherManager)
    
    [2016-03-28 17:17:10,584] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions[NUMBERS,0] (kafka.server.ReplicaFetcherManager)
    

    Где tail -f logs/server.log — вывод текущего лога Corax-брокера из файла в реальном времени.

    Запуск Corax-брокера на сервере Server-1 выполнен успешно.

  2. Выполните запуск узла ZooKeeper на сервере Server-2 в фоновом режиме с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-204 kafka]\$ bin/kafka-server-start.sh -daemon config/server.properties && tail -f logs/server.log
    
    [2016-03-28 17:35:35,726] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,36]
    
    [2016-03-28 17:35:35,728] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,36] in 2 milliseconds.
    
    [2016-03-28 17:36:11,334] INFO KafkaConfig values:
    
    ....
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:36:11,441] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:11,450] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:11,870] INFO Loading logs. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,775] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-1/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:36:12,776] INFO Recovering unflushed segment 0 in log NUMBERS-1. (kafka.log.Log)
    
    [2016-03-28 17:36:12,777] INFO Completed load of log NUMBERS-1 with log end offset 14 (kafka.log.Log)
    
    [2016-03-28 17:36:12,780] WARN Found an corrupted index file, /var/tmp/kafka/kafka-logs/NUMBERS-0/<значение индекса>.index, deleting and rebuilding index... (kafka.log.Log)
    
    [2016-03-28 17:36:12,781] INFO Recovering unflushed segment 0 in log NUMBERS-0. (kafka.log.Log)
    
    [2016-03-28 17:36:12,782] INFO Completed load of log NUMBERS-0 with log end offset 13 (kafka.log.Log)
    
    [2016-03-28 17:36:12,785] INFO Logs loading complete. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,786] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,799] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:36:12,858] INFO Awaiting socket connections on SBT-IPO-204.youre.adress.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:36:12,861] INFO [Socket Server on Broker 2], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:36:13,054] INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:36:13,064] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:36:13,064] INFO [GroupCoordinator 2]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:36:13,073] INFO [Group Metadata Manager on Broker 2]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:36:13,075] INFO [ExpirationReaper-2], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:36:13,087] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:36:13,088] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:36:13,094] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:36:13,119] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:36:13,133] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:36:13,135] INFO Registered broker 2 at path /brokers/ids/2 with addresses: PLAINTEXT -\> EndPoint(<host2-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:36:13,149] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:36:15,082] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [test2,1],[test,14],[test2,13],[test,15],[test,3],[test2,11],[test,6],[test5,1],[test2,5],[ThrashMetal,2],[test,17],[test5,0],[test,5],[test,11],[test3,1],[test2,15],[test2,18],[test2,8],[test,16],[test,1],[NUMBERS,0],[test,4],[test,19],[test2,3],[test2,10],[ThrashMetal,0],[test,18],[test,13],[test2,2],[test2,17],[test,12],[test2,7],[test,2],[test,8],[test2,16],[NUMBERS,1],[test2,6],[test,0],[test2,4],[test2,0],[test,7],[test2,9],[test,9],[superTest,1],[test3,0],[test2,14],[ThrashMetal,1],[superTest,0],[test2,19],[test,10],[test2,12] (kafka.server.ReplicaFetcherManager)
    
    [2016-03-28 17:36:15,096] INFO Truncating log NUMBERS-0 to offset 13. (kafka.log.Log)
    
    [2016-03-28 17:36:15,100] INFO Truncating log NUMBERS-1 to offset 14. (kafka.log.Log)
    
    [2016-03-28 17:36:15,162] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([[NUMBERS,1], ...) initOffset 14 to broker BrokerEndPoint(1,<host1-ip-address>,9092)]
    
    [2016-03-28 17:36:15,171] INFO [ReplicaFetcherThread-0-1], Starting (kafka.server.ReplicaFetcherThread)
    
    [2016-03-28 17:36:15,354] INFO [Group Metadata Manager on Broker 2]: Loading offsets and group metadata from [consumer_offsets,22] (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:36:15,424] INFO [Group Metadata Manager on Broker 2]: Finished loading offsets from [consumer_offsets,22] in 69 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:37:14,092] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [NUMBERS,1] (kafka.server.ReplicaFetcherManager)
    

    Запуск Corax-брокера на сервере Server-2 выполнен успешно.

  3. Выполните запуск узла Corax на сервере Server-3 в фоновом режиме с выводом логов:

    bin/kafka-server-start.sh -daemon
    
    config/server.properties && tail -f logs/server.log
    
    [mon99usr\@SBT-IPO-208 kafka]\$ bin/kafka-server-start.sh -daemon config/server-3-servers.properties && tail -f logs/server.log
    
    Kafka application log tail -f from /home/mon99usr/kafka/logs/server.log
    
    [ [2016-03-28 17:45:56,259] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:45:56,280] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:45:56,281] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -> EndPoint(<host3-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:45:56,300] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
    
    [2016-03-28 17:50:45,138] INFO New leader is 2 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:50:57,090] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:50:57,096] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:50:57,097] INFO 3 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
    
    [2016-03-28 17:51:00,733] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector\$LeaderChangeListener)
    
    [2016-03-28 17:52:37,509] INFO KafkaConfig values:
    
    ...
    
    (kafka.server.KafkaConfig)
    
    [2016-03-28 17:52:37,675] INFO starting (kafka.server.KafkaServer)
    
    [2016-03-28 17:52:37,686] INFO Connecting to zookeeper on <host1-ip-address>:2181,<host2-ip-address>:2181,<host3-ip-address>:2181 (kafka.server.KafkaServer)
    
    [2016-03-28 17:52:38,083] INFO Loading logs.
    
    [2016-03-28 17:52:38,090] INFO Logs loading complete.
    
    [2016-03-28 17:52:38,091] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:52:38,103] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
    
    [2016-03-28 17:52:38,163] INFO Awaiting socket connections on SBT-IPO-208.youre.adress.ru:9092. (kafka.network.Acceptor)
    
    [2016-03-28 17:52:38,169] INFO [Socket Server on Broker 3], Started 1 acceptor threads (kafka.network.SocketServer)
    
    [2016-03-28 17:52:38,199] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,203] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,378] INFO [GroupCoordinator 3]: Starting up. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:52:38,394] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 13 milliseconds. (kafka.coordinator.GroupMetadataManager)
    
    [2016-03-28 17:52:38,396] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,397] INFO [GroupCoordinator 3]: Startup complete. (kafka.coordinator.GroupCoordinator)
    
    [2016-03-28 17:52:38,407] INFO [ExpirationReaper-3], Starting (kafka.server.DelayedOperationPurgatory\$ExpiredOperationReaper)
    
    [2016-03-28 17:52:38,432] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:52:38,437] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager\$ThrottledRequestReaper)
    
    [2016-03-28 17:52:38,445] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader\$)
    
    [2016-03-28 17:52:38,485] INFO Creating /brokers/ids/3 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:52:38,504] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
    
    [2016-03-28 17:52:38,506] INFO Registered broker 3 at path /brokers/ids/3 with addresses: PLAINTEXT -\> EndPoint(<host3-ip-address>,9092,PLAINTEXT) (kafka.utils.ZkUtils)
    
    [2016-03-28 17:52:38,524] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
    

    Запуск Corax-брокера на сервере Server-3 выполнен успешно.

Запуск кластеров Corax и ZooKeeper выполнен успешно.

Тест доступности ZooKeeper / работоспособности Corax при различных условиях работы кластера ZooKeeper

  1. Тест работоспособности Corax при всех работающих узлах кластера ZooKeeper.

    • Вывод списка топиков:

      bin/kafka-topics.sh
      
      --list
      
      --bootstrap-server <host1-ip-address>:<port>,<host2-ip-address>:<port>,<host3-ip-address>:<port>
      
      NUMBERS
      

      В списке содержится топик NUMBERS.

    • Отправка сообщений в топик NUMBERS:

      bin/kafka-console-producer.sh
      
      --bootstrap-server
      <host1-ip-address>:<port>,<host2-ip-address>:<port>,<host3-ip-address>:<port>
      
      --topic NUMBERS
      
      1
      
      2
      
      3
      

      Запись в топик успешна.

    • Чтение сообщений из топика:

      bin/kafka-console-consumer.sh
      
      --group test-group
      
      --bootstrap-server <host2-ip-address>:9092
      
      --topic NUMBERS
      
      --from-beginning
      
      1
      
      2
      
      3
      

      Чтение из топика успешно.

  2. Тест работоспособности Corax при отказе одного узла кластера ZooKeeper:

    • список топиков выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.

  3. Тест работоспособности Corax при отказе двух узлов кластера ZooKeeper:

    • список топиков не выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.

  4. Тест работоспособности Corax при полном отказе кластера ZooKeeper:

    • список топиков не выводится;

    • чтение сообщений из топика выполняется успешно;

    • отправка сообщений в топик выполняется успешно.