Конфигурирование кластера Corax#
Конфигурирование кластера серверов Corax, как и конфигурирование одного узла, осуществляется в файле server.properties на каждом сервере, который планируется включить в кластер.
Минимальная конфигурация кластера#
Пример конфигурационного файла server.properties для кластера Corax из двух брокеров при установке по профилю no-auth (приведен пример конфигурации для одного из брокеров):
broker.id=1
listeners=PLAINTEXT://10.XX.XX.XX:YYYY
zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYY
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs
Шаги создания конфигурационного файла:
Присвойте ключу
broker.idнеобходимый числовой идентификатор.Необходимо соблюдать уникальность данного идентификатора на всех серверах кластера.
Пример
На первом сервере Corax:
broker.id=1На втором сервере:
broker.id=2Присвойте ключу
listenersIP-адрес сервера.Пример
listeners=PLAINTEXT://10.XX.XX.XX:YYYYЕсли на одном физическом сервере планируется запускать несколько серверов Corax, то для каждого сервера необходимо также указать уникальный порт, заданный в ключе
listeners. По умолчанию сервер Corax слушает порт 9092.Для ключа
zookeeper.connectукажите IP и PORT сервиса ZooKeeper в форматеzookeeper.connect=IP:PORT. Если кластер ZooKeeper запущен, то укажите адреса ZooKeeper-сервисов через запятую:Пример
zookeeper.connect=10.XX.XX.XX:YYYY,10.XX.XX.XX:YYYYДля ключа
log.dirsзадайте директорию, в которой планируется хранение сообщений, передаваемых в Corax.Пример
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logsПримечание
Если планируется использовать Corax под большой нагрузкой, то должно быть выделено достаточно места, т. к. все сообщения будут сохраняться на диск именно в указанной директории.
Если на физическом сервере возможно использование нескольких физических жестких дисков для хранения сообщений Corax, то для увеличения производительности перечислите пути к директориям на этих дисках через запятую:
log.dirs=/var/log/kafka_2.11-0.9.0.0/kafka-logs,/another_disk/kafka_2.11-0.9.0.0/kafka-logs
На данном этапе минимальная конфигурация узлов кластера Corax завершается. Если не существует каких-либо ошибок в конфигурации, а также ZooKeeper сконфигурирован корректно и запущен, то этого вполне достаточно для успешного запуска и работы кластера Corax.
Расширенная конфигурация кластера#
Конфигурирование производителя (producer)#
Минимальная конфигурация
Пример конфигурационного файла producer.properties для одного брокера (в примере в качестве хоста производителя указан текущий хост — hostname -f):
acks=1
bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZ
Шаги создания конфигурационного файла:
Присвойте ключу
acksзначение, указывающее, что сообщение отправлено:0 — сообщение расположено в TCP-буфере для отправки по сети;
1 — сообщение успешно записано в партицию-лидер;
1 (all) — сообщение успешно записано на все
ISR(InSyncReplicasпараметр на Kafka broker(server.properties))
Пример
acks=1Для ключа
bootstrap.serversукажитеIP:PORTсервиса Corax.Пример
Если запущены 3 экземпляра Corax на том же сервере:
bootstrap.servers=`hostname -f`:YYYY, `hostname -f`:UUUU, `hostname -f`:ZZZZДопускается также указывать IP-адреса, полные доменные имена.
Расширенная конфигурация
Существует ряд опциональных параметров, которые могут быть включены в файл конфигурации. Полный перечень параметров приведен в таблице ниже.
Запуск каждого отдельного производителя Corax выполняется командой:
./bin/kafka-console-producer --bootstrap-server `hostname -f`:9101, `hostname -f`:9102, `hostname -f`:9103 --topic test1 --producer.config etc/kafka/producer.properties
Название |
Описание |
Тип |
Значение по умолчанию |
|---|---|---|---|
key.serializer |
Класс сериализатора для ключа, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
value.serializer |
Класс сериализатора для значения, реализующего org.apache.kafka.common.serialization.Serializerinterface |
class |
Неприменимо |
acks |
Количество подтверждений, которые производитель должен получить от лидера, прежде чем считать запрос завершенным. Это определяет долговечность записей, которые будут отправлены. Разрешены следующие настройки: |
string |
all |
bootstrap.servers |
Список пар «хост: порт», используемых для установления начального соединения с кластером Corax. Клиент будет использовать все серверы независимо от того, какие серверы указаны здесь для начальной загрузки. Этот список влияет только на начальные хосты, используемые для обнаружения полного набора серверов. Этот список должен быть в виде: host1:port1, host2:port2, … Поскольку эти серверы используются только для первоначального подключения, чтобы обнаружить полное членство в кластере (которое может изменяться динамически), этот список не должен содержать полный набор серверов (можно указать более одного, на случай, если указанный сервер не работает) |
list |
«» |
buffer.memory |
Общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер. Если записи отправляются быстрее, чем они могут быть доставлены на сервер, производитель блокируется на время, заданное параметром max.block.ms, после чего он выдаст исключение. Этот параметр должен примерно соответствовать общей памяти, которую будет использовать производитель, но не является жесткой привязкой, поскольку не вся память, используемая производителем, используется для буферизации. Некоторая дополнительная память будет использоваться для сжатия (если сжатие включено), а также для поддержания inflight-запросов |
long |
33554432 |
compression.type |
Тип сжатия для всех данных, генерируемых производителем. Допустимые значения: none (без сжатия), gzip, snappy, lz4 или zstd. Сжатие полных партий данных, поэтому эффективность разбивки на порции данных также повлияет на степень сжатия (большее дозирование означает лучшее сжатие) |
string |
none |
retries |
Установка значения больше нуля будет означать для клиента переотправку любой записи, отправка которой завершилась предположительно временной ошибкой. Эта попытка не отличается от случая, когда клиент получает ошибку. Разрешение повторов (retries) без установки параметра max.in.flight.requests.per.connection=1 потенциально изменит порядок записей, т. к. если два пакета отправляются в один раздел и первый пакет вызывает сбой и повторяется, но второй успешно записан, то запись о втором пакете может появиться первой. Запросы на запись будут завершены ошибкой до того, как количество попыток будет исчерпано, а если тайм-аут настроен, delivery.timeout.msexpires будет достигнут до успешного подтверждения. Пользователи обычно предпочитают не указывать этот параметр, а использовать delivery.timeout.msexpires для управления поведением повторной попытки |
int |
2,15E+09 |
ssl.key.password |
Пароль закрытого ключа в файле keystore. Это необязательно для клиента |
password |
null |
ssl.keystore.location |
Расположение закрытого ключа в файле keystore. Это необязательно для клиента |
string |
null |
ssl.keystore.password |
Пароль хранилища для файла хранилища ключей (keystore). Это необязательно для клиента и требуется, если используется параметр ssl.keystore.location |
password |
null |
ssl.truststore.location |
Расположение файла хранилища доверительных сертификатов (truststore) |
string |
null |
ssl.truststore.password |
Пароль для файла хранилища доверительных сертификатов (truststore). Если пароль не задан, доступ к truststore по-прежнему доступен, но проверка целостности отключена |
password |
null |
batch.size |
При отправке нескольких записей в один раздел производитель попытается объединить записи в меньшее количество запросов. Это повышает производительность как на клиенте, так и на сервере. Этот параметр управляет размером пакета по умолчанию в байтах. Не будет предпринята попытка объединения записей в пакеты большего размера. Запросы, отправленные брокерам, будут содержать несколько пакетов, по одному для каждой партиции с доступными для отправки данными. Небольшой размер пакета сделает дозирование менее распространенным и может уменьшить пропускную способность (нулевой размер пакета полностью отключит дозирование). Очень большой размер пакета может использовать память немного более расточительно, так как всегда будет выделяться буфер указанного размера пакета в ожидании дополнительных записей |
int |
16384 |
client.dns.lookup |
Параметр управляет тем, как клиент использует DNS lookup. Если установлено значение use\all\dns\ips, то, когда поиск возвращает несколько IP-адресов для имени хоста, все они будут пытаться подключиться до сбоя подключения. Применяется как к bootstrap, так и к advertised серверам. Если установлено значение resolve\canonical\bootstrap\servers\only, каждая запись будет разрешена и расширена в список канонических имен |
string |
default |
client.id |
Строка id для передачи серверу при выполнении запросов. Позволяет отслеживать источник запросов за пределами только ip/port, позволяя строковое имя логического приложения включать в лог-файл запросов на стороне сервера |
string |
«» |
connections.max.idle.ms |
Закрытие неактивных соединений после количества миллисекунд, указанных для этого параметра |
long |
540000 |
delivery.timeout.ms |
Верхняя граница времени для отчета об успехе или сбое после получения ответа на вызов send(). Это ограничивает общее время задержки записи перед отправкой, время ожидания подтверждения от брокера (acks) (если ожидается) и время, разрешенное для повторных отправок при сбоях. Producer может сообщить об ошибке отправки записи раньше, чем это обозначено в конфигурации, если обнаружена неустранимая ошибка, количество повторных попыток отправки было исчерпано или запись добавлена в пакет, который достиг окончания срока доставки (deadline) раньше. Значение этой конфигурации должно быть больше или равно сумме request.timeout.ms + linger.ms |
int |
120000 |
linger.ms |
Производитель группирует все записи, поступающие между запросами передачи, в один пакетированный запрос. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако в некоторых случаях клиенту может понадобиться уменьшить количество запросов даже при умеренной нагрузке. Рассматриваемая настройка позволяет это сделать, добавляя небольшое количество искусственной задержки, то есть вместо немедленной отправки записи производитель будет ждать до заданной задержки, чтобы разрешить отправку других записей и отправить их вместе. Это можно рассматривать как аналог алгоритма Нейгла в TCP. Параметр определяет верхнюю границу задержки для дозирования: как только получен batch.size записей для партиции, они будут отправлены немедленно независимо от этого параметра, однако если количество байтов меньше, чем накопленных для этого раздела, будет выполняться задержка в течение указанного времени, ожидая появления большего количества записей. Этот параметр по умолчанию равен 0 (т. е. без задержки). Установка linger.ms=5, например, будет иметь эффект уменьшения количества отправленных запросов, но добавит до 5 мс задержки к записям, отправленным в отсутствие нагрузки |
int |
0 |
max.block.ms |
Параметр определяет, как долго KafkaProducer.send() и KafkaProducer.partitionsFor() будут блокироваться. Эти методы могут быть заблокированы, так как буфер заполнен или метаданные недоступны. Блокировка в пользовательских сериализаторах или разделителе не будет учитываться в течение этого времени ожидания |
long |
60000 |
max.request.size |
Максимальный размер запроса в байтах. Этот параметр ограничивает количество партий записей, которые производитель отправляет в одном запросе, чтобы избежать отправки огромных запросов. Это также эффективно ограничивает максимальный размер пакета записи. Обратите внимание, что сервер имеет свой собственный размер пакета записи, который может отличаться от заданного в данном параметре |
int |
1048576 |
partitioner.class |
Класс разделителя, реализующий org.apache.kafka.clients.producer.Partitioner-интерфейс |
class |
org.apache.kafka.clients.producer.internals.DefaultPartitioner |
receive.buffer.bytes |
Размер буфера приема TCP (SO_RCVBUF) для использования при чтении данных. Если значение равно -1, по умолчанию будет использоваться размер буфера, установленный в ОС |
int |
32768 (32 кБ) |
request.timeout.ms |
Параметр управляет максимальным временем ожидания клиентом ответа на запрос. Если ответ не получен до истечения тайм-аута, клиент при необходимости повторно отправит запрос или не выполнит запрос, если повторные попытки исчерпаны. Значение должно быть больше, чем replica.lag.time.max.ms (конфигурация брокера) для уменьшения возможности дублирования сообщений из-за ненужных попыток производителя |
int |
30000 |
sasl.client.callback.handler.class |
Полное имя класса обработчика обратного вызова клиента SASL, реализующего интерфейс AuthenticateCallbackHandler |
class |
null |
sasl.jaas.config |
Параметры контекста входа (login) JAAS для SSL-соединений в формате, используемом файлами конфигурации JAAS. Формат файла конфигурации JAAS описан здесь. Формат значения: |
password |
null |
sasl.kerberos.service.name |
Имя участника (принципала) Kerberos, под которым работает Kafka. Это можно определить либо в конфигурации JAAS Kafka, либо в конфигурации Kafka |
string |
null |
sasl.login.callback.handler.class |
Полное имя класса обработчика обратного вызова входа SASL, реализующего интерфейс AuthenticateCallbackHandler. Для брокеров конфигурация обработчика обратного вызова входа (login callback handler) должна иметь префикс прослушивателя (listener) и имя механизма SASL в нижнем регистре. Например: listener.name.sasl\sasl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class |
null |
sasl.login.class |
Полное имя класса, который определяет интерфейс входа. Для брокеров конфигурация входа должна быть с префиксом listener’а и именем механизма SASL в нижнем регистре. Например: listener.name.sasl\ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin |
class |
null |
sasl.mechanism |
Механизм SASL, используемый для клиентских подключений. Это может быть любой механизм, для которого доступен поставщик безопасности. GSSAPI является механизмом по умолчанию |
string |
GSSAPI |
security.protocol |
Протокол, используемый для связи с брокерами. Допустимые значения: PLAINTEXT, SASL, SASL\PLAINTEXT, SASL\SSL |
string |
PLAINTEXT |
send.buffer.bytes |
Размер буфера отправки TCP (SO_SNDBUF) для использования при отправке данных. Если значение равно -1, будет использоваться ОС по умолчанию |
int |
131072 |
ssl.enabled.protocols |
Список протоколов, разрешенных для SSL-соединений |
list |
TLSv1.3,TLSv1.2 |
ssl.keystore.type |
Формат файла хранилища ключей (key store). Параметр необязателен для клиента |
string |
JKS |
ssl.protocol |
Протокол SSL, используемый для создания SSLContext. По умолчанию — TLS, который подходит для большинства случаев. Допустимые значения: TLSv1.2 и TLSv1.3. SSL, SSLv2 и SSLv3 могут поддерживаться в старых JVMs, но их использование не рекомендуется из-за известных уязвимостей безопасности |
string |
TLS |
ssl.provider |
Имя поставщика безопасности, используемого для SSL-соединений. Значение по умолчанию — значение, установленное по умолчанию в JVM |
string |
null |
ssl.truststore.type |
Формат файла хранилища доверия (trust store) |
string |
JKS |
enable.idempotence |
Если задано значение true, производитель гарантирует, что в поток записывается ровно одна копия каждого сообщения. Если false, производитель повторяет попытку из-за сбоев брокера и т. д., может писать дубликаты повторенного сообщения в потоке. Обратите внимание, что для включения идемпотенции требуется, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество попыток переотправки (retries) было больше 0, а ack=all. Если эти значения явно не заданы пользователем, будут выбраны подходящие значения. Если установлены несовместимые значения, будет создано исключение ConfigException |
boolean |
false |
interceptor.classes |
Список классов для использования в качестве перехватчиков. Реализовывает org.apache.kafka.clients.producer.ProducerInterceptor-интерфейс, позволяющий перехватывать (и, возможно, изменять) записи, полученные производителем до их публикации в кластере Kafka. По умолчанию перехватчиков нет |
list |
«» |
max.in.flight.requests.per.connection |
Максимальное количество неподтвержденных запросов, отправляемых клиентом по одному соединению перед блокировкой. Если этот параметр больше 1 и есть неудачные отправки, существует риск переупорядочения сообщений из-за повторных попыток (т. е. если повторные попытки включены) |
int |
5 |
metadata.max.age.ms |
Период времени в миллисекундах, после которого необходимо обновить метаданные, даже если нет каких-либо изменений в руководстве разделов, чтобы проактивно обнаружить новые брокеры или разделы |
long |
300000 |
metric.reporters |
Список классов для использования в качестве репортеров метрик. Реализация org.apache.kafka.common.metrics.MetricsReporter интерфейса позволяет подключать классы, которые будут уведомлены о создании новой метрики. JmxReporter всегда включен для регистрации статистики JMX |
list |
«» |
metrics.num.samples |
Количество выборок, сохраняемых для вычисления метрик |
int |
2 |
metrics.recording.level |
Самый высокий уровень записи для метрик |
string |
INFO |
metrics.sample.window.ms |
Окно времени, в котором вычисляется выборка метрик |
long |
30000 |
reconnect.backoff.max.ms |
Максимальное время ожидания в миллисекундах при повторном подключении к брокеру, которому неоднократно не удавалось подключиться. Если это предусмотрено, время переподключения на хост будет увеличиваться экспоненциально для каждого последовательного сбоя соединения, до этого максимума. После расчета увеличения времени переподключения добавляется 20% случайного джиттера; это позволяет избежать избыточного количества подключений, которое ведет к деградации производительности брокера |
long |
1000 |
reconnect.backoff.ms |
Базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к хосту в узком цикле. Это отступление применяется ко всем попыткам подключения клиента к брокеру |
long |
50 |
retry.backoff.ms |
Время ожидания перед попыткой повторить неудачный запрос к данному разделу темы. Это позволяет избежать повторной отправки запросов в узком цикле при некоторых сценариях сбоя |
long |
100 |
sasl.kerberos.kinit.cmd |
Путь к команде Kerberos kinit |
string |
/usr/bin/kinit |
sasl.kerberos.min.time.before.relogin |
Время сна потока входа между попытками обновления |
long |
60000 |
sasl.kerberos.ticket.renew.jitter |
Процент случайного jitter, добавленного к времени обновления |
double |
0.05 |
sasl.kerberos.ticket.renew.window.factor |
Определяет окно времени в процентах от срока действия билета безопасности Kerberos. Когда остаток времени до истечения срока действия билета достигает определенного процента, заданного данным параметром, сеансы аутентификации могут попытаться обновить билет Kerberos |
double |
0.8 |
sasl.login.refresh.buffer.seconds |
Количество времени буфера до истечения срока действия учетных данных для поддержания при обновлении учетных данных в секундах. Если обновление произойдет ближе к истечению, чем количество секунд буфера, обновление будет перемещено, чтобы сохранить как можно больше времени буфера. Допустимые значения находятся в диапазоне от 0 до 3600 (1 час); значение по умолчанию — 300 (5 минут). Это значение и значение sasl.login.refresh.min.period.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
300 |
sasl.login.refresh.min.period.seconds |
Требуемое минимальное время ожидания потока обновления имени входа перед обновлением учетных данных в секундах. Допустимые значения находятся в диапазоне от 0 до 900 (15 минут); если значение не указано, используется значение по умолчанию 60 (1 минута). Это значение и значение sasl.login.refresh.buffer.seconds игнорируются, если их сумма превышает оставшееся время жизни учетных данных. В настоящее время применяется только к носителю OAUTHBEARER |
short |
60 |
sasl.login.refresh.window.factor |
Поток обновления входа будет спать до тех пор, пока не будет достигнут указанный коэффициент окна относительно времени жизни учетных данных, после чего он попытается обновить учетные данные. Допустимые значения: от 0,5 (50%) до 1,0 (100%) включительно; значение по умолчанию — 0.8 (80%). В настоящее время применяется только к носителю OAUTHBEARER |
double |
0.8 |
sasl.login.refresh.window.jitter |
Максимальное количество случайного джиттера относительно времени жизни учетных данных, добавленного ко времени сна потока обновления входа. Допустимые значения: от 0 до 0,25 (25%) включительно. Значение по умолчанию — 0,05 (5%). В настоящее время применяется только к OAUTHBEARER |
double |
0.05 |
ssl.cipher.suites |
Список шифровальных наборов. Это именованная комбинация аутентификации, шифрования, MAC и алгоритма обмена ключами, используемая для согласования параметров безопасности для сетевого подключения с использованием сетевого протокола TLS или SSL. По умолчанию поддерживаются все доступные наборы шифров |
list |
null |
ssl.endpoint.identification.algorithm |
Алгоритм идентификации конечной точки для проверки имени хоста сервера с помощью сертификата сервера |
string |
https |
ssl.keymanager.algorithm |
Алгоритм, используемый фабрикой key manager для SSL-соединений. Значение по умолчанию — это алгоритм фабрики Key manager, настроенный для виртуальной машины Java |
string |
SunX509 |
ssl.secure.random.implementation |
Реализация SecureRandom PRNG, используемая для операций шифрования SSL |
string |
null |
ssl.trustmanager.algorithm |
Алгоритм, используемый фабрикой trust manager для SSL-соединений. Значение по умолчанию — это заводской алгоритм trust manager, настроенный для виртуальной машины Java |
string |
PKIX |
transaction.timeout.ms |
Максимальное время в миллисекундах, в течение которого координатор транзакций будет ждать обновления статуса транзакции от производителя, прежде чем проактивно прервать текущую транзакцию. Если это значение больше, чем transaction.max.timeout.ms в брокере, запрос завершится ошибкой InvalidTransactionTimeout |
int |
60000 |
transactional.id |
Идентификатор транзакции, используемый для доставки транзакций. Это включает семантику надежности, которая охватывает несколько сеансов производителя, так как это позволяет клиенту гарантировать, что транзакции с использованием того же TransactionalId были завершены до начала любых новых транзакций. Если идентификатор транзакции не указан, производитель ограничивается идемпотентной доставкой. Обратите внимание, что enable.idempotencemust должна быть включена, если настроен идентификатор транзакции. Значение по умолчанию равно null, что означает невозможность использования транзакций. По умолчанию для транзакций требуется кластер по крайней мере из трех брокеров, что является рекомендуемым параметром для производства; для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor |
string |
null |