Руководство по системному администрированию#

Термины и сокращения#

Термин/Аббревиатура

Определение

SEDR

Четырехбуквенный код программного компонента Сервис междоменной репликации событий из состава программного продукта Platform V Synapse Streaming Event Processing

TLS

Transport Layer Security - криптографический протокол, обеспечивающией защищенную передачу данных между узлами

JKS

Java Keystore - хранилище сертификатов открытых ключей и авторизации, которое используется приложениями на основе Java для шифрования, аутентификации и установки соединений HTTPS

jq

утилита для обработки JSON в командной строке

JMX

Java Management Extensions — технология Java, предназначенная для мониторинга и управления (в т.ч. удаленно) различными объектами (ресурсами): приложениями, устройствами, сетями

Jenkins

Программная система для обеспечения процесса непрерывной интеграции программного обеспечения

Nexus

Nexus репозиторий

worker\воркер

Часть технического сервиса междоменной репликации событий (SEDR). Определяет направление репликации событий - домен-источник и домен назначения, в том числе при нахождении доменов в разных сетевых зонах (сегментах). Для каждого направления репликации необходим отдельный worker. В рамках каждого воркера может быть развернуто необходимое количество коннекторов

Сценарии администрирования#

Предусловия

  1. Наличие сертификатов TLS для администратора. Перед началом действий поместить сертификаты в домашнюю директорию на сервере, где расположен техсервис передачи событий.

  2. Для выполнения административных действий без использования скриптов автоматизации необходимо сконвертировать сертификаты в формат pem. Для JKS, выполнить команды:

keytool -importkeystore -deststoretype PKCS12 -noprompt -srckeystore <jks SEDR> -srcstorepass '<пароль хранилища источника>' -destkeystore tmp.pem -deststorepass '<пароль создаваемого хранилища>'

openssl pkcs12 -in tmp.pem -passin pass:'<пароль хранилища>' -out <наименование экземпляра техсервиса>.pem -clcerts -nodes

Создание архивных копий компонентов SEDR#

Предусловия: Необходимо заполнить параметры в конфигурационном файле vars.yml.

  • в разделе replicator:

    • backup_installdir_to: — директория создания архивных копий для директории установки SEDR.

Ручной способ

На сервере, с которого производилась установка выполнить команду:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t backup -l <host>

, где ID - имя созданного inventory, host - доп. параметр для указания конкретных хостов, если требуется выполнение на конкретных хостах.

При помощи Jenkins

Для создания архивной копии директории установки EVTP необходимо запустить задание Jenkins SYN_custom_replicator (поставляется в дистрибутиве) с выбором playbook replicator.yml c тегом backup. Результатом выполнения задачи Jenkins является архив с названием replicator_backup.tar.gz, расположенный в директории, указанной в параметрах конфигурационного файла vars.yml.

Созданные архивные копии хранятся в директории в единичном экземпляре для предотвращения переполнения директории хранения архивов. При попытке создания архивной копии повторно выполнение задания Jenkins будет прерываться, если в указанной директории уже создан ранее архив. Поэтому необходимо перед созданием нового архива предварительно удалить существующие архивные копии из данной директории. Для этого необходимо запустить задание Jenkins SYN_custom_replicator с выбором playbook replicator.yml с тегом backup_remove.

Теги backup_remove и backup можно совмещать в одном запуске задания Jenkins для экономии времени.

Восстановление из архивных копий компонентов SEDR#

Предусловия: Необходимо заполнить параметры в конфигурационном файле vars.yml.

  • в разделе replicator:

    • backup_installdir_to: — директория создания архивных копий для директории установки SEDR.

Ручной способ

На сервере, с которого производилась установка выполнить команду:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t backup_restore -l <host>

, где ID - имя созданного inventory, host - доп. параметр для указания конкретных хостов, если требуется выполнение на конкретных хостах.

При помощи Jenkins

Для восстановления SEDR из архивной копии необходимо запустить задание Jenkins SYN_custom_replicator (поставляется в дистрибутиве) с выбором playbook replicator.yml c тегом backup_restore. Результатом будет являться восстановление директории установки SEDR из архива, помещенного в указанную в файле vars.yml директорию.

Получение списка коннекторов#

При помощи Jenkins

Для получения списка коннекторов с помощью Jenkins используйте задание SYN_custom_replicator:

  • с выбором playbook replicator.yml с тегом status (покажет состояние и список коннекторов);

Настраиваемые параметры:

  • inventory - выбрать кластер, в котором необходимо получить список коннекторов SEDR.

Ручной способ

curl -v --cert ./<наименование экземпляра техсервиса>.pem --key ./<наименование экземпляра техсервиса>.pem -k https://localhost:8090/connectors

Получение статуса коннекторов#

При помощи Jenkins

Для получения статуса коннекторов с помощью Jenkins используйте задание SYN_custom_replicator:

  • с выбором playbook replicator.yml с тегом status (покажет состояние и список коннекторов);

Настраиваемые параметры:

  • inventory - выбрать кластер, в котором необходимо получить статус коннекторов SEDR.

Ручной способ

curl -v --cert ./<наименование экземпляра техсервиса>.pem --key ./<наименование экземпляра техсервиса>.pem -k https://localhost:8090/connectors/<наименование коннектора>/status

Запуск коннектора#

При помощи Jenkins

Для запуска коннектора с помощью Jenkins используйте в папке Release задание Jenkins replicator_install:

  • с выбором тега start_connector (запустит коннектор выбранной версии (из nexus) в выбранном кластере);

Настраиваемые параметры:

  • inventory - выбрать кластер, для которого необходимо запустить коннектор;

  • nexus_version - выбираем версию дистрибутива репликатора из nexus;

  • tasks - указать количество запускаемых коннекторов (по умолчанию равен 2).

Ручной способ

curl -v --cert ./<наименование экземпляра техсервиса>.pem --key ./<наименование экземпляра техсервиса>.pem -k -X POST --data '<JSON описание конфигурации>' https://localhost:8090/connectors/

Пример конфигурации:

{
"name": "<наименование коннектора>", 
"config": 
    {
    "connector.class":"ru.sbt.esb.atm.sink.SbtKafkaSinkConnector", 
    "tasks.max":"<количество задач>", 
    "topics":"<топик-источник>",
    "output.topic": "<топик-получатель>",
    "header.converter": "ru.sbt.esb.atm.converter.ByteArrayHeaderConverter",
	"producer.properties": "config/<наименование экземпляра техсервиса>.properties",
    "ha.producer.enabled":"false"
    }
}

Остановка коннектора#

При помощи Jenkins

Для остановки коннектора с помощью Jenkins используйте в папке Release задание replicator_install:

  • с выбором тега stop_connector (запустит коннектор выбранной версии (из nexus) в выбранном кластере);

Настраиваемые параметры:

  • inventory - выбрать кластер, для которого необходимо остановить коннектор;

  • nexus_version - выбираем версию дистрибутива репликатора из nexus;

  • tasks - указать количество коннекторов (по умолчанию равен 2).

Ручной способ

curl -v --cert ./<наименование экземпляра техсервиса>.pem --key ./<наименование экземпляра техсервиса>.pem -k -X DELETE https://localhost:8090/connectors/<наименование коннектора>

Перезапуск коннектора#

При помощи Jenkins

Для перезапуска коннектора с помощью Jenkins используйте в папке Release задание replicator_install:

  • с выбором тега stop_connector, start_connector (перезапустит коннектор выбранной версии (из nexus) в выбранном кластере);

Настраиваемые параметры:

  • inventory - выбрать кластер, для которого необходимо перезапустить коннектор;

  • nexus_version - выбираем версию дистрибутива репликатора из nexus;

  • tasks - указать количество коннекторов (по умолчанию равен 2).

Ручной способ

curl -v --cert ./<наименование экземпляра техсервиса>.pem --key ./<наименование экземпляра техсервиса>.pem -k -X POST https://localhost:8090/connectors/<наименование коннектора>/restart

Автоматический перезапуск задач (tasks) коннектора#

Если задачи коннектора находятся в нерабочем состоянии, т.е. имеют статус, отличный от running, скрипт tasks_restart.sh, созданный в процессе установки воркера, перезапускает задачи коннектора.

Данный скрипт перезапускает нерабочую задачу, дополнительно логируя событие перезапуска задачи в лог с именем воркера. Например, tasks_restart_<имя воркера>.log. В случае, если <имя воркера> представляет из себя synapse_replicator, то файл лога будет называться "tasks_restart_synapse_replicator.log".

Для работы этого скрипта необходимо предустановить пакет jq. В случае отсутствия этого пакета, установка воркера завершится с ошибкой.

Задача на проверку состояния задач коннектора выполняется службой CRON параллельно для всех воркеров каждую минуту.

Создание конфигурационных дистрибутивов#

Ручной способ

Заполнить файл vars.yml, блок replicator.connectors:

replicator:
  connectors:
    - topics: <топик источник>
      output.topic: <топик назначения>
      worker: <имя воркера из блока replicator.workers>
      name: <имя коннектора>
      tasks: 2 # параллелизм

При помощи Jenkins

Создание конфигурационных дистрибутивов осуществляется с помощью задания Jenkins replicator_config_create.

Необходимо заполнить параметры:

  • NexusUrl — url-ссылка на папку, в которую необходимо поместить дистрибутив;

  • NexusCred — ID креденшнл для публикации дистрибутива в указанной папке Nexus.

При запуске задания Jenkins с указанными параметрами дистрибутив SEDR будет создан в указанной папке.

Перезапуск компонентов SEDR#

При помощи Jenkins

Для перезапуска воркера с помощью Jenkins используйте задание SYN_custom_replicator:

  • с выбором playbook replicator.yml и тегов stop, start (для перезапуска всех воркеров на выбранном контуре);

  • для перезапуска конкретного воркера помимо указанных тегов следует дополнительно включить тег run_only и в разделе usedWorker указать те воркеры, которые необходимо перезапустить.

Настраиваемые параметры:

  • inventory - выбрать кластер, для которого необходимо перезапустить компоненты;

  • nexus_version - выбираем версию дистрибутива репликатора из nexus;

  • only_on_host - отметить галочками нужные хост(ы) из списка (список соответствует выбранному inventory), если необходимо перезапустить компоненты только для данного хоста(ов) выбранного кластера;

  • install_all_hosts - выбрать параметр, если необходимо перезапустить все хосты из данного inventory.

Если не выбран ни один из параметров only_on_host, install_all_hosts выполнение задания Jenkins прервется с ошибкой Не выбраны хосты.

Ручной способ

На сервере, с которого производилась установка выполнить команду:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t start,stop -l <host>

, где ID - имя созданного inventory, host - доп. параметр для указания конкретных хостов, если требуется перезапуск на конкретных хостах.

Получение списка коннекторов и их статуса#

При помощи Jenkins

Для получения статуса коннекторов с помощью Jenkins используйте задание SYN_custom_replicator:

  • с выбором playbook replicator.yml с тегом status (покажет состояние и список коннекторов);

Настраиваемые параметры:

  • inventory - выбрать кластер, в котором необходимо получить статус коннекторов SEDR.

Ручной способ

На сервере, с которого производилась установка выполнить команду:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t status

где ID - имя созданного inventory.

Установка и перезапуск коннекторов SEDR с помощью Jenkins#

Ручной способ

Предварительно необходимо заполнить файл vars.yml, блок replicator.connectors:

replicator:
  connectors:
    - topics: <топик источник>
      output.topic: <топик назначения>
      worker: <имя воркера из блока replicator.workers>
      name: <ИМЯ КОННЕТОРА>
      tasks: 2 # ПАРАЛЛЕЛИЗМ

На сервере, с которого производилась установка выполнить команду:

  • для установки коннектора:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t start_connector
  • для перезапуска:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t stop_connector,start_connector

где ID - имя созданного inventory. Данные операции выполнятся для всех коннекторов из файла vars.yml, блок replicator.connectors. Для проведения операции над конкретными коннекторами добавляются теги run_only,<имя коннектора1>,<имя коннектора2>.

Например:

ansible-playbook -i inventories/<ID>/inventory replicator.yml --ask-vault-pass -t stop_connector,start_connector,run_only,<имя коннектора1>,<имя коннектора2>

При помощи Jenkins

Выполнить перезапуск и быстро восстановить работоспособность коннекторов SEDR можно с помощью задания Jenkins replicator_config_deploy.

Предварительно необходимо заполнить файл vars.yml, блок replicator.connectors, либо файл connectors.yml, который находится в директории Ansible/inventories/<inventory>, списком коннекторов. Порядок строк в файле имеет значение. Перезапуск или установка коннекторов осуществляется в порядке, указанном в файле, сверху вниз. Первыми будут установлены те коннекторы, что расположены выше.

Для перезапуска коннекторов выбрать:

  • параметр all_releases_from_vars - если требуется запустить все коннекторы, указанные в настройке inventory (запустятся все коннекторы, указанные в файле connectors.yml или в файле vars.yml, блок replicator.connectors)

  • в параметре custom_releases_from_vars выбрать те коннекторы, которые необходимо запустить (в данном параметре подгружается список всех коннекторов из файла connectors.yml (файла vars.yml, блок replicator.connectors) в директории Ansible/inventories/<inventory>, можно выбрать конкретный коннектор, работоспособность которого необходимо восстановить).

При работе с одним из этих параметров выкачивания дистрибутивов из пространства nexus не производится, вследствие чего сокращается время работы задания Jenkins. Иначе происходит выкачивание из nexus версии дистрибутива, указанной в параметре nexus_version.

Журналирование#

Для SEDR может быть настроена отправка системных журналов в централизованную систему журналирования (для банковских инсталляций - Platform V Monitor).

Предварительно необходимо узнать названия топиков централизованной системы журналирования (для банковских инсталляций - Platform V Monitor), куда будут отправляться логи SEDR.

Перед установкой (переустановкой) новой версии SEDR необходимо в конфигурационном файле vars.yml заполнить блок:

 logback_kafka_appender:
    enable: false # включение механизма отправки логов в Kafka через logback. ТОЛЬКО при расположении не на серверах кафки! (true в случае использования механизма отправки логов в Platform V Monitoring)
    libs:
      - logback-core-1.2.3.jar
      - logback-classic-1.2.3.jar
      - logback-kafka-appender-0.1.0.jar
      - gson-2.8.6.jar
      - logback-rolling-policy_2.12-1.0.10.jar
    libs_to_remove:
      - slf4j-log4j12-1.7.30.jar
    topic: <имя топика системы журналирования для отправки логов SEDR> # топик для отправки логов

и блок logback_kafka_appender.producer_configs:

      - "bootstrap.servers=<бутстрапы подключения к централизованной системе журналирования>"
      - "security.protocol=SSL"
      - "ssl.keystore.location=<указать путь до сертификатов подключения к централизованной системе журналирования>"
      - "ssl.keystore.password=<зашифрованный пароль подключения к keystore>"
      - "ssl.truststore.location=<указать путь до сертификатов подключения к централизованной системе журналирования>"
      - "ssl.truststore.password=<зашифрованный пароль подключения к truststore>"
      - "ssl.endpoint.identification.algorithm="

Настройки отправляемых сообщений указываются в файле /conf/logback.xml. Отправляемые сообщения имеют формат json согласно паттерну, описываемому в файле logback.xml.

Для настройки отправки сообщений необходимо заполнить блок аппендера ReplicatorLogger файла logback.xml. В данном блоке стоит обратить внимание на заполнение следующих параметров:

Параметр

Описание

Значение по умолчанию

message

Указывается, как будет называться поле для тела сообщения

message

timestamp

Указывается, как будет называться поле для временной метки

timestamp

level

Уровень логирования сообщения (INFO/WARN/ERROR/DEBUG/TRACE)

level

needAddThreadToMessage

Указывается нужно ли добавлять имя потока в поле с телом сообщения

true

name=host

Имя хоста расположения SEDR

dns-имя или ip

name=system

Имя воркера

Worker name

topic

Топик записи сообщений логов

replicator_log

client.id

Идентификатор продьюсера Kafka

пусто

bootstrap.servers

Хосты подключения к системе журналирования

host:port,host:port

security.protocol

Тип протокола подключения к системе журналирования

SSL

ssl.endpoint.identification.algorithm

пусто

appender-ref

Альтернативное место для сообщений логов

По умолчанию закомментировано

Пример файла с заполненными настройками:

<configuration scan="true" scanPeriod="30 seconds">
    <!-- Экранирование символа доллара -->
    <property scope="context" name="dollar" value="$" />

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file><путь до файла логов></file>
        <rollingPolicy class="ch.qos.logback.core.rolling.SBTTimeBasedRollingPolicy">
            <!-- rollover every daily -->
            <fileNamePattern>xxx-%d{yyyy-MM-dd}.%i.log.zip</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <!-- or whenever the file size reaches 50MB -->
                <maxFileSize>50MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <maxHistory>5</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %level %logger{60} - %msg%n</pattern>
        </encoder>
    </appender>

    <!-- Аппендер для передачи логов репликатора в формате Platform V Monitoring -->
    <appender name="ReplicatorLogger" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ru.sbt.logback.layout.Grok">
                <message>message</message>
                <timestamp>timestamp</timestamp>
                <level>level</level>
                <needAddThreadToMessage>false</needAddThreadToMessage>
                <fields>
                    <![CDATA[
                           name=host        <host name>
                           name=nameWorker  CONST=<worker name>
                    ]]>
                </fields>
            </layout>
        </encoder>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <topic>synapse_replicator_topic</topic>
        <producerConfig>client.id=<Идентификатор продьюсера Kafka></producerConfig>
        <producerConfig>bootstrap.servers=<bootstrap подключения к Platform V Monitoring></producerConfig>
        <producerConfig>security.protocol=SSL</producerConfig>
        <producerConfig>ssl.keystore.location=<путь до сертификата подключения к Platform V Monitoring></producerConfig>
        <producerConfig>ssl.keystore.password=<зашифрованный пароль></producerConfig>
        <producerConfig>ssl.truststore.location=<путь до сертификата подключения к Platform V Monitoring></producerConfig>
        <producerConfig>ssl.truststore.password=<зашифрованный пароль></producerConfig>
        <producerConfig>ssl.endpoint.identification.algorithm=</producerConfig>
        <producerConfig>config.providers=decode</producerConfig>
        <producerConfig>config.providers.decode.class=xxx</producerConfig>
        <producerConfig>config.providers.decode.param.security.encoding.key=/opt/Apache/kafka/config/encrypt.pass</producerConfig>
        <producerConfig>config.providers.decode.param.security.encoding.class=password.decoder.SimpleTextPasswordDecoder</producerConfig>
        <producerConfig>config.providers.decode.param.security.encoding.salt=password.salt.SaltProvider</producerConfig>
        <appender-ref ref="file"/>
    </appender>

    <appender name="ASYNC-logger" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="ReplicatorLogger"/>
    </appender>

    <logger name="org.apache.zookeeper" level="ERROR" additivity="false">
        <appender-ref ref="file"/>
        <appender-ref ref="ASYNC-logger"/>
    </logger>

    <logger name="org.I0Itec.zkclient" level="ERROR" additivity="false">
        <appender-ref ref="file"/>
        <appender-ref ref="ASYNC-logger"/>
    </logger>

    <logger name="org.reflections" level="ERROR" additivity="false">
        <appender-ref ref="file"/>
        <appender-ref ref="ASYNC-logger"/>
    </logger>
    
    <logger name="latency" level="DEBUG" additivity="false">
        <appender-ref ref="file"/>
    </logger>

    <root level="INFO">
        <appender-ref ref="file"/>
        <appender-ref ref="ASYNC-logger"/>
    </root>

</configuration>

События системного журнала#

Программный компонент SEDR сохраняет информацию о происходящих событиях в два файла: $replicator_logs_dir/kafka-connect-<наименование экземпляра техсервиса>.out - содержит события запуска экземпляра техсервиса. $replicator_logs_dir/kafka-connect-<наименование экземпляра техсервиса>.log - содержит события работы экземпляра техсервиса.

Наиболее часто встречающиеся события в файлах логов типа ERROR:

  1.  2021-11-30 19:37:01.195 [main] ERROR org.apache.kafka.connect.cli.ConnectDistributed  - Stopping due to error
     org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient<br>
         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:535)
         at org.apache.kafka.clients.admin.Admin.create(Admin.java:75)<br>
         at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:50)
         at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97)
         at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
     Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore ssl/kafka-cluster-1.jks of type JKS
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:377)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createKeystore(DefaultSslEngineFactory.java:299)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:161)
         at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
         at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
         at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
         at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
         at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
         at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:508)
         ... 4 common frames omitted
     Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
         at java.base/sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:795)
         at java.base/sun.security.util.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:222)
         at java.base/java.security.KeyStore.load(KeyStore.java:1479)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:374)
         ... 14 common frames omitted
     Caused by: java.security.UnrecoverableKeyException: Password verification failed
         at java.base/sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:793)
         ... 17 common frames omitted| Не подошел пароль от хранилища (ssl.keystore.password или ssl.truststore.password)|
    

- Не подошел пароль от хранилища (ssl.keystore.password или ssl.truststore.password)

  1.  2021-11-30 19:38:49.313 [main] ERROR org.apache.kafka.connect.cli.ConnectDistributed  - Stopping due to error
     org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:535)
         at org.apache.kafka.clients.admin.Admin.create(Admin.java:75)
         at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:50)
         at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97)
         at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
     Caused by: org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Cannot recover key
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:268)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:173)
         at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
         at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
         at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
         at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
         at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
         at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
         at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:508)
         ... 4 common frames omitted
     Caused by: java.security.UnrecoverableKeyException: Cannot recover key
         at java.base/sun.security.provider.KeyProtector.recover(KeyProtector.java:304)
         at java.base/sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:144)
         at java.base/sun.security.util.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:90)
         at java.base/java.security.KeyStore.getKey(KeyStore.java:1057)
         at java.base/sun.security.ssl.SunX509KeyManagerImpl.<init>(SunX509KeyManagerImpl.java:145)
        at java.base/sun.security.ssl.KeyManagerFactoryImpl$SunX509.engineInit(KeyManagerFactoryImpl.java:70)
         at java.base/javax.net.ssl.KeyManagerFactory.init(KeyManagerFactory.java:271)
         at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:251)
         ... 12 common frames omitted
    

- Не подошел пароль от ключа в хранилище (ssl.key.password)

  1. 2021-11-30 19:40:56.194 [main] ERROR org.apache.kafka.connect.cli.ConnectDistributed  - Stopping due to error
     org.jasypt.exceptions.EncryptionOperationNotPossibleException: null
         at org.jasypt.encryption.pbe.StandardPBEByteEncryptor.decrypt(StandardPBEByteEncryptor.java:1055)
         at org.jasypt.encryption.pbe.StandardPBEStringEncryptor.decrypt(StandardPBEStringEncryptor.java:725)
         at org.jasypt.util.text.BasicTextEncryptor.decrypt(BasicTextEncryptor.java:112)
         at ru.sbt.ss.password.decoder.SimpleTextPasswordDecoder.decode(SimpleTextPasswordDecoder.java:57)
         at ru.sbt.ss.kafka.DecryptionConfigProvider.$anonfun$get$1(DecryptionConfigProvider.scala:51)
         at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:99)
         at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:86)
         at scala.collection.convert.JavaCollectionWrappers$JSetWrapper.map(JavaCollectionWrappers.scala:180)
         at ru.sbt.ss.kafka.DecryptionConfigProvider.get(DecryptionConfigProvider.scala:51)
         at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
         at org.apache.kafka.common.config.AbstractConfig.resolveConfigVariables(AbstractConfig.java:495)
         at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:107)
         at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
         at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:452)
         at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:405)
         at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
         at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
    

- Ключом из conf/encrypt.pass не смогли расшифровать любое зашифрованное значение в конфиге вида ${decode:C5eoLl0iVSaYIzTMYFs+DQ==}

  1. 2021-11-30 19:48:43.865 [KafkaBasedLog Work Thread - connect-offsets] WARN  org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer clientId=replicator.tkleq-snaps0003.vm.esrt.cloud.example.ru, groupId=pcidss-to-alpha-group] Not authorized to read from partition connect-offsets-11.
     2021-11-30 19:48:43.865 [KafkaBasedLog Work Thread - connect-offsets] ERROR org.apache.kafka.connect.util.KafkaBasedLog  - Error polling: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [connect-offsets]
     2021-11-30 19:48:43.865 [KafkaBasedLog Work Thread - connect-offsets] WARN  org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer clientId=replicator.tkleq-snaps0003.vm.esrt.cloud.example.ru, groupId=pcidss-to-alpha-group] Not authorized to read from partition connect-offsets-9.
     2021-11-30 19:48:43.866 [KafkaBasedLog Work Thread - connect-offsets] ERROR org.apache.kafka.connect.util.KafkaBasedLog  - Error polling: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [connect-offsets]
     2021-11-30 19:48:43.866 [KafkaBasedLog Work Thread - connect-offsets] WARN  org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer clientId=replicator.tkleq-snaps0003.vm.esrt.cloud.example.ru, groupId=pcidss-to-alpha-group] Not authorized to read from partition connect-offsets-15.
     2021-11-30 19:48:43.866 [KafkaBasedLog Work Thread - connect-offsets] ERROR org.apache.kafka.connect.util.KafkaBasedLog  - Error polling: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [connect-offsets]
     2021-11-30 19:48:43.866 [KafkaBasedLog Work Thread - connect-offsets] WARN  org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer clientId=replicator.tkleq-snaps0003.vm.esrt.cloud.example.ru, groupId=pcidss-to-alpha-group] Not authorized to read from partition connect-offsets-13.
     2021-11-30 19:48:43.866 [KafkaBasedLog Work Thread - connect-offsets] ERROR org.apache.kafka.connect.util.KafkaBasedLog  - Error polling: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [connect-offsets]
    

    - Для текущего хранилища нет прав на использование системных топиков (в данном случае connect-offsets)

События мониторинга#

Мониторинг SEDR производится:

  • Самостоятельно через преднастроенный JMX-порт.

  • С помощью системы мониторинга Mayak.

Настройка системы мониторинга для SEDR#

Предварительно необходимо задать настройки в конфигурационном файле vars.yml (или в файле collectros.yml).

Пример заполненных настроек для коллекторов в файле vars.yml:

...
collector:
  installdir: /opt/collector # абсолютный путь на конечном сервере до приложения
  logdir: /logs/collector # абсолютный путь до дирeктории с логами
  cleanLog: false # очистка logdir при установке
  loglevel: INFO
  log_total_size_cap: 4GB
  influxKeystorePass: <логин подключения к Keystore Influx> # Обязательно
  edPassword: <зашифрованный пароль>
  kafkaKeystorePass: <логин подключения к Keystore EVTD> # Обязательно
  jmxPassword: <зашифрованный пароль>
  edHost: <host ETCD> # адрес etcd
  edPort: <port ETCD> # порт etcd
  no_edBootstrapJmx: <Bootstrap подключения к JMX>
  no_edBootstrapServers: <host1:port,host2:port> # host1:port,host2:port
  no_edFlinkTaskManagersJmx: <ip:port>
  no_edFlinkJobManagersJmx: <ip:port>
  no_edReplicatorJmx: <host1:port,host2:port подключения к репликатору JMX>
  no_edArtemisJmx: <host1:port,host2:port подключения к Artemis JMX>
  no_edEtcdHostPort: <host1:port,host2:port подключения к ETCD>
  list:
    <имя коллектора1>:
      type: replicator
      config: replicator-metrics.conf
      no_edReplicatorJmx: <host1:port,host2:port подключения к репликатору JMX>
    ...
    <имя коллектора N>:
      type: replicator
      config: replicator-metrics.conf
      no_edReplicatorJmx: <host1:port,host2:port подключения к репликатору JMX>
...

Ручной способ

На сервере, с которого производилась установка подсистемы мониторинга Mayak выполнить команду:

ansible-playbook -i inventories/<ID>/inventory collector.yml --ask-vault-pass 

где ID - имя созданного inventory. Данная операция выполнется для всех коллекторов из файла vars.yml, блок collector.list. Для проведения операции над конкретными коллекторами добавляются теги run_only,<имя коллектора1>,<имя коллектора2>.

Например:

ansible-playbook -i inventories/<ID>/inventory collector.yml --ask-vault-pass -t all,run_only,<имя коллектора1>,<имя коллектора2>

При помощи Jenkins

Установка/переустановка коллекторов мониторинга для SEDR производится с помощью задания Jenkins monitoring_custom.

Настраиваемые параметры:

  • playbook: collector.yml;

  • без тегов - установит все коллекторы, описанные в конфигурационном файле vars.yml;

  • по тегу run_only подтягивается список коллекторов usedCollectors из конфигурационного файла vars.yml (или из файла collectros.yml). В нем галочками можно отметить необходимые коллекторы для установки/переустановки.

Отслеживаемые метрики#

Область действия метрики

Метрика

Триггер

MBEAN Name

Экземпляр техсервиса

Количество воркеров

Не должно уменьшаться

kafka.connect:type=connect-worker-metrics, connector-count

Экземпляр техсервиса

Статус ребаланса

Не должен долгое время отличаться от 0

kafka.connect:type=connect-worker-rebalance-metrics, rebalancing

Экземпляр коннектора

Статус коннектора

Не равно "Running"

kafka.connect:type=connector-metrics,connector="<наименование экземпляра коннектора>", status

Экземпляр коннектора

Общее количество задач

должно быть > 0

kafka.connect:type=connect-worker-metrics,connector="<наименование экземпляра коннектора>", connector-total-task-count

Экземпляр коннектора

Количество задач завершившихся с ошибкой

должно быть 0

kafka.connect:type=connect-worker-metrics,connector="<наименование экземпляра коннектора>", connector-failed-task-count

Экземпляр коннектора

Количество выполняющихся задач

должно быть > 0

kafka.connect:type=connect-worker-metrics,connector="<наименование экземпляра коннектора>", connector-running-task-count

Задача

Максимальное время подтверждения чтения из техсервиса передачи событий

не больше 2000 миллисекунд

kafka.connect:type=connector-task-metrics,connector="<наименование экземпляра коннектора>",task="<порядковый номер задачи>", offset-commit-max-time-ms

Задача

Статус задачи

Не равно "Running"

kafka.connect:type=connector-task-metrics,connector="<наименование экземпляра коннектора>",task="<порядковый номер задачи>", status

Задача

Отставание вычитки от последнего сообщения

Больше 0

kafka.connect:type=connector-task-metrics,connector="<наименование экземпляра коннектора>",task="<порядковый номер задачи>", sink-record-lag-max

Часто встречающиеся проблемы и пути их устранения#

Не выявлено.