Конфигурирование кластера Corax#

Конфигурирование кластера серверов Corax, как и конфигурирование одного узла, осуществляется в файле server.properties на каждом сервере, который планируется включить в кластер.

Минимальная конфигурация кластера#

Пример конфигурационного файла server.properties для кластера Corax из двух брокеров при установке по профилю no-auth (приведен пример конфигурации для одного из брокеров):

broker.id=1

listeners=PLAINTEXT://10.XX.XX.XX:YYYY

zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYY

log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

Шаги создания конфигурационного файла:

  1. Присвойте ключу broker.id необходимый числовой идентификатор.

    Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.

    Пример

    На первом сервере Corax:

    broker.id=1

    На втором сервере:

    broker.id=2

  2. Присвойте ключу listeners IP-адрес сервера.

    Пример

    listeners=PLAINTEXT://10.XX.XX.XX:YYYY
    

    Если на одном физическом сервере планируется запускать несколько серверов Corax, то для каждого сервера необходимо также указать уникальный порт, заданный в ключе listeners. По умолчанию сервер Corax слушает порт 9092.

  3. Для ключа zookeeper.connect укажите IP и PORT сервиса ZooKeeper в формате zookeeper.connect=IP:PORT. Если кластер ZooKeeper запущен, то укажите адреса ZooKeeper-сервисов через запятую:

    Пример

    zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYY

  4. Для ключа log.dirs задайте директорию, в которой планируется хранение сообщений, передаваемых в Corax.

    Пример

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs

    Примечание

    Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.

    Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:

    log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs

На данном этапе минимальная конфигурация узлов кластера Corax завершается. Если не существует каких-либо ошибок в конфигурации, а также ZooKeeper сконфигурирован корректно и запущен, то этого вполне достаточно для успешного запуска и работы кластера Corax.

Расширенная конфигурация кластера#

Конфигурирование производителя (producer)#

Минимальная конфигурация

Пример конфигурационного файла producer.properties для одного брокера (в примере в качестве хоста производителя указан текущий хост — hostname -f):

acks=1

bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZ

Шаги создания конфигурационного файла:

  1. Присвойте ключу acks значение, указывающее, что сообщение отправлено:

    • 0 — сообщение расположено в TCP-буфере для отправки по сети;

    • 1 — сообщение успешно записано в партицию-лидер;

    • 1 (all) — сообщение успешно записано на все ISR (InSyncReplicas параметр на Kafka broker (server.properties))

    Пример

    acks=1

  2. Для ключа bootstrap.servers укажите IP:PORT сервиса Corax.

    Пример

    Если запущены 3 экземпляра Corax на том же сервере:

    bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZ
    

    Допускается также указывать IP-адреса, полные доменные имена.

Расширенная конфигурация

Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.

Запуск каждого отдельного производителя Corax выполняется командой:

./bin/kafka-console-producer --bootstrap-server `hostname -f`:9101, `hostname -f`:9102, `hostname -f`:9103 --topic test1 --producer.config etc/kafka/producer.properties

Название

Описание

Тип

Значение по умолчанию

key.serializer

Класс сериализатора для ключа, реализующего org.apache.kafka.common.serialization.Serializerinterface

class

Неприменимо

value.serializer

Класс сериализатора для значения, реализующего org.apache.kafka.common.serialization.Serializerinterface

class

Неприменимо

acks

Количество подтверждений, которые производитель должен получить от лидера, прежде чем считать запрос завершенным. Это определяет долговечность записей, которые будут отправлены. Разрешены следующие настройки:
1) acks=0 — производитель не будет ждать подтверждения от сервера. Запись будет немедленно добавлена в буфер сокета и будет считаться отправленной. В этом случае нельзя гарантировать, что сервер получил запись, и конфигурация повторных попыток не вступит в силу (поскольку клиент обычно не знает о каких-либо сбоях). Смещение, заданное для каждой записи, всегда будет равно -1;
2) acks=1 — означает, что лидер запишет в свой локальный лог-файл, но ответит, не дожидаясь полного подтверждения от всех ISR (In-Sync Replicas — все реплики партиции, синхронизированные с лидером). Если лидер партиции завершит работу с ошибкой после подтверждения записи, но до того как реплицируются записи на ISR, запись будет потеряна;
3) acks=all — лидер будет ждать полного набора синхронизированных реплик (ISR), чтобы подтвердить запись. Это гарантирует, что запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается в рабочем состоянии. Это самая надежная из доступных гарантий. Эквивалентно настройке acks=-1.
Для включения идемпотентности требуется, чтобы acks=all. Если заданы конфликтующие конфигурации и идемпотентность явно не включена, идемпотентность будет отключена

string

all

bootstrap.servers

Список пар «хост: порт», используемых для установления начального соединения с кластером Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде: host1:port1, host2:port2, … Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (можно указать более одного, на случай, если указанный сервер не работает)

list

«»

buffer.memory

Общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, производитель блокируется на время, заданное параметром max.block.ms, после чего он выдаст исключение. Этот параметр должен примерно соответствовать общей памяти, которую будет использовать производитель, но не является жесткой привязкой, поскольку не вся память, используемая производителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержания inflight-запросов

long

33554432

compression.type

Тип сжатия для всех данных, генерируемых производителем. Допустимые значения: none (без сжатия), gzip, snappy, lz4 или zstd. Сжатие полных партий данных, поэтому эффективность разбивки на порции данных также повлияет на степень сжатия (большее дозирование означает лучшее сжатие)

string

none

retries

Установка значения больше нуля будет означать для клиента переотправку любой записи, отправка которой завершилась предположительно временной ошибкой. Эта попытка не отличается от случая, когда клиент получает ошибку. Разрешение повторов (retries) без установки параметра max.in.flight.requests.per.connection=1 потенциально изменит порядок записей, т. к. если два пакета отправляются в один раздел и первый пакет вызывает сбой и повторяется, но второй успешно записан, то запись о втором пакете может появиться первой. Запросы на запись будут завершены ошибкой до того, как количество попыток будет исчерпано, а если тайм-аут настроен, delivery.timeout.msexpires будет достигнут до успешного подтверждения. Пользователи обычно предпочитают не указывать этот параметр, а использовать delivery.timeout.msexpires для управления поведением повторной попытки

int

2,15E+09

ssl.key.password

Пароль закрытого ключа в файле keystore. Это необязательно для клиента

password

null

ssl.keystore.location

Расположение закрытого ключа в файле keystore. Это необязательно для клиента

string

null

ssl.keystore.password

Пароль хранилища для файла хранилища ключей (keystore). Это необязательно для клиента и требуется, если используется параметр ssl.keystore.location

password

null

ssl.truststore.location

Расположение файла хранилища доверительных сертификатов (truststore)

string

null

ssl.truststore.password

Пароль для файла хранилища доверительных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, но проверка целостности отключена

password

null

batch.size

При отправке нескольких записей в один раздел производитель попытается объединить записи в меньшее количество запросов. Это повышает производительность как на клиенте, так и на сервере. Этот параметр управляет размером пакета по умолчанию в байтах. Не будет предпринята попытка объединения записей в пакеты большего размера. Запросы, отправленные брокерам, будут содержать несколько пакетов, по одному для каждой партиции с доступными для отправки данными. Небольшой размер пакета сделает дозирование менее распространенным и может уменьшить пропускную способность (нулевой размер пакета полностью отключит дозирование). Очень большой размер пакета может использовать память немного более расточительно, так как всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей

int

16384

client.dns.lookup

Параметр управляет тем, как клиент использует DNS lookup. Если установлено значение use\all\dns\ips, то, когда поиск возвращает несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверам. Если установлено значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена и расширена в список канонических имен

string

default

client.id

Строка id для передачи серверу при выполнении запросов. Позволяет отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в лог-файл запросов на стороне сервера

string

«»

connections.max.idle.ms

Закрытие неактивных соединений после количества миллисекунд, указанных для этого параметра

long

540000

delivery.timeout.ms

Верхняя граница времени для отчета об успехе или сбое после получения ответа на вызов send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (acks) (если ожидается) и время, разрешенное для повторных отправок при сбоях. Producer может сообщить об ошибке отправки записи раньше, чем это обозначено в конфигурации, если обнаружена неустранимая ошибка, количество повторных попыток отправки было исчерпано или запись добавлена в пакет, который достиг окончания срока доставки (deadline) раньше. Значение этой конфигурации должно быть больше или равно сумме request.timeout.ms + linger.ms

int

120000

linger.ms

Производитель группирует все записи, поступающие между запросами передачи, в один пакетированный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако в некоторых случаях клиенту может понадобиться уменьшить количество запросов даже при умеренной нагрузке. Рассматриваемая настройка позволяет это сделать, добавляя небольшое количество искусственной задержки, то есть вместо немедленной отправки записи производитель будет ждать до заданной задержки, чтобы разрешить отправку других записей и отправить их вместе. Это можно рассматривать как аналог алгоритма Нейгла в TCP. Параметр определяет верхнюю границу задержки для дозирования: как только получен batch.size записей для партиции, они будут отправлены немедленно независимо от этого параметра, однако если количество байтов меньше, чем накопленных для этого раздела, будет выполняться задержка в течение указанного времени, ожидая появления большего количества записей. Этот параметр по умолчанию равен 0 (т. е. без задержки). Установка linger.ms=5, например, будет иметь эффект уменьшения количества отправленных запросов, но добавит до 5 мс задержки к записям, отправленным в отсутствие нагрузки

int

0

max.block.ms

Параметр определяет, как долго KafkaProducer.send() и KafkaProducer.partitionsFor() будут блокироваться. Эти методы могут быть заблокированы, так как буфер заполнен или метаданные недоступны. Блокировка в пользовательских сериализаторах или разделителе не будет учитываться в течение этого времени ожидания

long

60000

max.request.size

Максимальный размер запроса в байтах. Этот параметр ограничивает количество партий записей, которые производитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. Это также эффективно ограничивает максимальный размер пакета записи. Обратите внимание, что сервер имеет свой собственный размер пакета записи, который может отличаться от заданного в данном параметре

int

1048576

partitioner.class

Класс разделителя, реализующий org.apache.kafka.clients.producer.Partitioner-интерфейс

class

org.apache.kafka.clients.producer.internals.DefaultPartitioner

receive.buffer.bytes

Размер буфера приема TCP (SO_RCVBUF) для использования при чтении данных. Если значение равно -1, по умолчанию будет использоваться размер буфера, установленный в ОС

int

32768 (32 кБ)

request.timeout.ms

Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны. Значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера) для уменьшения возможности дублирования сообщений из-за ненужных попыток производителя

int

30000

sasl.client.callback.handler.class

Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler

class

null

sasl.jaas.config

Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: loginModuleClass controlFlag (optionName=optionValue)\*;. Для брокеров конфигурация должна иметь префикс слушателя и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.jaas.config=com.example.ScramLoginModule required

password

null

sasl.kerberos.service.name

Имя участника (принципала) Kerberos, под которым работает Kafka. Это можно определить либо в конфигурации JAAS Kafka, либо в конфигурации Kafka

string

null

sasl.login.callback.handler.class

Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс AuthenticateCallbackHandler. Для брокеров конфигурация обработчика обратного вызова входа (login callback handler) должна иметь префикс прослушивателя (listener) и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class

null

sasl.login.class

Полное имя класса, который определяет интерфейс входа. Для брокеров конфигурация входа должна быть с префиксом listener’а и именем механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin

class

null

sasl.mechanism

Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого доступен поставщик безопасности. GSSAPI является механизмом по умолчанию

string

GSSAPI

security.protocol

Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL

string

PLAINTEXT

send.buffer.bytes

Размер буфера отправки TCP (SO_SNDBUF) для использования при отправке данных. Если значение равно -1, будет использоваться ОС по умолчанию

int

131072

ssl.enabled.protocols

Список протоколов, разрешенных для SSL-соединений

list

TLSv1.3,TLSv1.2

ssl.keystore.type

Формат файла хранилища ключей (key store). Параметр необязателен для клиента

string

JKS

ssl.protocol

Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимые значения: TLSv1.2 и TLSv1.3. SSL, SSLv2 и SSLv3 могут поддерживаться в старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности

string

TLS

ssl.provider

Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию — значение, установленное по умолчанию в JVM

string

null

ssl.truststore.type

Формат файла хранилища доверия (trust store)

string

JKS

enable.idempotence

Если задано значение true, производитель гарантирует, что в поток записывается ровно одна копия каждого сообщения. Если false, производитель повторяет попытку из-за сбоев брокера и т. д., может писать дубликаты повторенного сообщения в потоке. Обратите внимание, что для включения идемпотенции требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество попыток переотправки (retries) было больше 0, а ack=all. Если эти значения явно не заданы пользователем, будут выбраны подходящие значения. Если установлены несовместимые значения, будет создано исключение ConfigException

boolean

false

interceptor.classes

Список классов для использования в качестве перехватчиков. Реализовывает org.apache.kafka.clients.producer.ProducerInterceptor-интерфейс, позволяющий перехватывать (и, возможно, изменять) записи, полученные производителем до их публикации в кластере Kafka. По умолчанию перехватчиков нет

list

«»

max.in.flight.requests.per.connection

Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если этот параметр больше 1 и есть неудачные отправки, существует риск переупорядочения сообщений из-за повторных попыток (т. е. если повторные попытки включены)

int

5

metadata.max.age.ms

Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений в руководстве разделов, чтобы проактивно обнаружить новые брокеры или разделы

long

300000

metric.reporters

Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporter интерфейса позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX

list

«»

metrics.num.samples

Количество выборок, сохраняемых для вычисления метрик

int

2

metrics.recording.level

Самый высокий уровень записи для метрик

string

INFO

metrics.sample.window.ms

Окно времени, в котором вычисляется выборка метрик

long

30000

reconnect.backoff.max.ms

Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного джиттера; это позволяет избежать избыточного количества подключений, которое ведет к деградации производительности брокера

long

1000

reconnect.backoff.ms

Базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру

long

50

retry.backoff.ms

Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя

long

100

sasl.kerberos.kinit.cmd

Путь к команде Kerberos kinit

string

/usr/bin/kinit

sasl.kerberos.min.time.before.relogin

Время сна потока входа между попытками обновления

long

60000

sasl.kerberos.ticket.renew.jitter

Процент случайного jitter, добавленного к времени обновления

double

0.05

sasl.kerberos.ticket.renew.window.factor

Определяет окно времени в процентах от срока действия билета безопасности Kerberos. Когда остаток времени до истечения срока действия билета достигает определенного процента, заданного данным параметром, сеансы аутентификации могут попытаться обновить билет Kerberos

double

0.8

sasl.login.refresh.buffer.seconds

Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER

short

300

sasl.login.refresh.min.period.seconds

Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER

short

60

sasl.login.refresh.window.factor

Поток обновления входа будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%). В настоящее время применяется только к носителю OAUTHBEARER

double

0.8

sasl.login.refresh.window.jitter

Максимальное количество случайного джиттера относительно времени жизни учетных данных, добавленного ко времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно. Значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER

double

0.05

ssl.cipher.suites

Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров

list

null

ssl.endpoint.identification.algorithm

Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера

string

https

ssl.keymanager.algorithm

Алгоритм, используемый фабрикой key manager для SSL-соединений. Значение по умолчанию — это алгоритм фабрики Key manager, настроенный для виртуальной машины Java

string

SunX509

ssl.secure.random.implementation

Реализация SecureRandom PRNG, используемая для операций шифрования SSL

string

null

ssl.trustmanager.algorithm

Алгоритм, используемый фабрикой trust manager для SSL-соединений. Значение по умолчанию — это заводской алгоритм trust manager, настроенный для виртуальной машины Java

string

PKIX

transaction.timeout.ms

Максимальное время в миллисекундах, в течение которого координатор транзакций будет ждать обновления статуса транзакции от производителя, прежде чем проактивно прервать текущую транзакцию. Если это значение больше, чем transaction.max.timeout.ms в брокере, запрос завершится ошибкой InvalidTransactionTimeout

int

60000

transactional.id

Идентификатор транзакции, используемый для доставки транзакций. Это включает семантику надежности, которая охватывает несколько сеансов производителя, так как это позволяет клиенту гарантировать, что транзакции с использованием того же TransactionalId были завершены до начала любых новых транзакций. Если идентификатор транзакции не указан, производитель ограничивается идемпотентной доставкой. Обратите внимание, что enable.idempotencemust должна быть включена, если настроен идентификатор транзакции. Значение по умолчанию равно null, что означает невозможность использования транзакций. По умолчанию для транзакций требуется кластер по крайней мере из трех брокеров, что является рекомендуемым параметром для производства; для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor

string

null