Описание источников событий#

Поддерживаются источники событий 6 видов:

  • Топик Apache Kafka;

  • Подключение к Apache Kafka через Event Discovery;

  • Очередь IBM MQ;

  • Чтение из файла (обработчик завершит работу после того, как будет достигнут конец файла);

  • Мониторинг директории с файлами в файловой системе;

  • Подключение Active MQ Artemis.

При файловом взаимодействии каждая строка в файле считается отдельным событием.

Описание источника событий#

Источником является объект конфигурации с полем *type* содержащим значение *source*, также содержит следующие поля:

  1. name — название источника событий;

  2. topic — имя топика для чтения сообщений;

  3. config — конфигурация транспорта источника;

  4. destination — описание следующего шага обработки события после источника.

Поле *config* содержит специфическую конфигурацию транспорта:

Apache Kafka:#

  1. type — тип транспорта, значение: «kafka»;

  2. secret — ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.key.password», «ssl.keystore.password», «ssl.truststore.password»;

  3. consumer — настройки получателя сообщений;

  4. producer — настройки отправителя сообщений;

  5. Другие настройки Kafka как «bootstrap.servers» и т.д.;

  6. flinkSslConfigPath — необязательный параметр с путем к файлу конфигурации flink-conf.yaml, из которого необходимо получить ssl-контекст.

IBM MQ:#

  1. type — тип транспорта, значение: «ibm_mq»;

  2. secret — ключ для расшифровки пароля пользователя, параметра «password», опционально;

  3. host — host для подключения;

  4. port — порт для подключения;

  5. channel — имя канала для подключения;

  6. queueManager — имя менеджера очередей;

  7. user — имя пользователя для подключения, опционально;

  8. password — пароль пользователя для подключения, опционально;

  9. ssl.enabled — включение/отключение защищенного протокола, опционально;

  10. ssl.keystore.location — путь до хранилища сертификатов, опционально;

  11. ssl.keustore.password — пароль от хранилища сертификатов, опционально;

  12. ssl.truststore.location — путь до хранилища доверенных сертификатов, опционально;

  13. ssl.truststore.password — пароль от хранилища доверенных сертификатов, опционально;

  14. ssl.secret — ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.store.password», «ssl.truststore.password»;

  15. receiveConversionClient — значение настройки WMQ_RECEIVE_CONVERSION : true — WMQ_RECEIVE_CONVERSION_CLIENT_MSG, false — WMQ_RECEIVE_CONVERSION_QMGR, по умолчанию false;

  16. headers — список заголовков для передачи в трансформацию;

  17. charset — кодировка входящего сообщения;

  18. sslPeerName — DN сертификата менеджера очередей, опционально;

  19. sslCipherSuite — шифр подключения к менеджеру, опционально;

  20. retryCount — количество попыток переподключения, по умолчанию 3;

  21. retryInterval — начальное время ожидания между попытками восстановления соединения в миллисекундах, по умолчанию 1000;

  22. retryMultiplier — множитель интервала со второй попытке подключения, по умолчанию 2;

  23. useIBMCipherMappings — использовать таблицу преобразования шифров IBM, по умолчанию false;

  24. mq — дополнительные параметры фабрики подключений.

Файл (обработчик завершит работу после того, как будет достигнут конец файла, рекомендуется использовать с параллелизмом 1):

  1. type — тип транспорта, значение: «file»;

  2. path — полный путь до файла или директории с файлами.

Мониторинг директории с файлами в файловой системе (обработчик периодически отслеживает изменения в файлах):

  1. type — тип транспорта, значение: «file-stream»;

  2. path — полный путь до файла или директории с файлами;

  3. interval — интервал проверки изменений в файле/директории в мс.

Active MQ Artemis#

Адрес и имя очереди, из которой необходимо вычитывать сообщения, указываются в поле topic конфигурации источника событий через ::. Например, необходимо вычитывать из очереди queue1 по адресу test-input, тогда в поле topic необходимо указать test-input::queue1.
В случае, если имя очереди совпадает с адресом, то в данном поле можно указать только адрес. Например, необходимо вычитывать из очереди test с адресом test, тогда в поле topic необходимо указать test.

  1. type - тип транспорта, значение: "artemis_mq".

  2. connectors - список брокеров для подключения, в формате host:port через запятую.

  3. secret - ключ для шифрования паролей, не обязательный параметр (если необходимо шифрование паролей, то необходимо указать).

  4. charset - кодировка сообщений, по умолчанию UTF-8.

  5. connectorConfigs - ssl-конфигурация брокера для подключения (в случае использования HashiCorp Vault здесь также указываются настройки подключения к HashiCorp Vault, Настройки выпуска или получения сертификатов из HashiCorp Vault):

    • sslEnabled включение/отключение SSL, по умолчанию false (в случае использования HashiCorp Vault данную настройку можно не указывать, будет использоваться значение true);

    • keyStorePath путь до хранилища приватного ключа;

    • keyStorePassword пароль от хранилища приватного ключа;

    • trustStorePath путь до хранилища доверенных сертификатов;

    • trustStorePassword пароль от хранилища доверенных сертификатов;

    • verifyHost включение/отключение проверки имени хоста клиента и CN сертификата клиента, по умолчанию true.

  6. factory параметры к org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory:

    • blockOnqueue синхронная отправка сообщений в очередь, по умолчанию false;

    • loadBalancingPolicyClassName имя балансировщика нагрузки, по умолчанию org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy (первым подключением выбирается случайное из списка). В дистрибутиве присутствует альтернативный балансировщик ru.sbt.flink.streaming.connectors.jms.artemismq.utils.RoundRobinLoadBalancingPolicy, который всегда начинает с первого подключения;

    • reconnectAttempts количество повторных попыток подключения, по умолчанию 0;

    • recoveryInterval интервал между попытками восстановления соединения, по умолчанию 2000 мс;

    • recoveryIntervalMultiplier множитель, на который будет умножаться каждый следующий интервал между переподключениями, по умолчанию 1 (между попытками переподключениями будет один и тот же интервал). Введенное значение округляется до целого;

    • confirmationWindowSize размер буфера команд, переданных от клиента серверу, по умолчанию -1 (буферизация отключена);

    • callTimeout время при отправке через кластерное соединение, по умолчанию 30000 мс.

  7. receiveMode - тип подписки - может принимать значения subscribe или poll (синхронный или асинхронный режим приема сообщений), по умолчанию subscribe.

  8. receiveTimeout - время ожидания получения сообщения, по умолчанию 0 мс (время ожидания не ограничено).

Источники событий#

source: {
   name: "kafka"
   type: "source"
   topic: "input"
   destination: {}
   config: {
      type: "kafka"
      "bootstrap.servers":"localhost:9092"
      consumer: {
         "group.id": "event-process-flow-group"
         "client.id": "event-process-flow-client"
      }
      producer: {
         "client.id": "event-process-flow-producer"
      }
   }
}
source: {
      name: "file"
      type: "source"
      topic: "input"
      destination: {}
      config: {
        type: "file"
        path: "/path/to/file.txt"
      }
    }
source: {
      name: "file-stream"
      type: "source"
      topic: "input"
      destination: {}
      config: {
        type: "file-stream"
        path: "/path/to/directory/"
        interval: "5000"
      }
    }
source: {
    name: "IBM MQ source"
    type: "source"
    topic: "TRANSPORT.TO.KAFKA"
    config: {
        type: "ibm_mq"
        host: "broker.host"
        port: "1415"
        channel: "ANTON_SSL"
        queueManager: "M99.CEP.EL1"
        secret: "/ssl/encrypt.pass"
        ssl: {
            enabled: "true"
            protocol: "TLSv1.2"
            keystore: {
                location: "/ssl/app.jks"
                password: "tXuQ5VOZSHwXmi0rS0fmqw=="
            }
            truststore: {
                location: "/ssl/app.jks"
                password: "fvYQdHnAMaXPuJi1x1OSZQ=="
            }
        }
        charset: "UTF-8"
        sslPeerName: "CN=peerName"
        sslCipherSuite: "TLS_RSA_WITH_AES_128_CBC_SHA256"
        useIBMCipherMappings: "false"
    }
    destination: {}
  }
  source: {
   name: "Active MQ Artemis source"
   type: "source"
   topic: "flink-flow-test-input"
   config: {
      type: "artemis_mq"
      connectors: "10.25.68.8:31617, localhost:61616"
      user: "test_user"
      secret: "secret"
      password: "password"
      connectorConfigs: {
         sslEnabled: "true"
         keyStorePath: "keystore-path"
         keyStorePassword: "enc:X43sOqEAPIrd6UfD3yw6Qk/f8zjA5Hc44Lpz1SPq86fJMWf4UC/T3MHGP07oroCM"
         trustStorePath: "truststore-path"
         trustStorePassword: "enc:I+e0v/Tm+Qonb0s9xAAXpnkhuI9nMhqbiMmqm+DAkuYhvYxY/KdG3qNGnBZ+HnHz"
         verifyHost: "true"
      }
      factory: {
         blockOnQueue: true
         reconnectAttempts: 1
         recoveryInterval: 3000
         recoveryIntervalMultiplier: 2
         confirmationWindowSize: 1
         loadBalancingPolicyClassName: "ru.sbt.flink.streaming.connectors.jms.artemismq.utils.RoundRobinLoadBalancingPolicy"
         callTimeout: 3000
      }
      receiveMode: "poll"
      receiveTimeout: 3000
   }
}

Описание источника событий с подключением через сервис Event Discovery#

Является объект конфигурации с полем type содержащим значение «eventDiscoverySource», также содержит следующие поля:

  1. name — название источника событий;

  2. uri — идентификатор события в Event Discovery, например: fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1;

  3. config — конфигурация транспорта источника;

  4. destination — описание следующего шага обработки события после источника.

При старте обработчика происходит запрос в Event Discovery, откуда по имени и версии события возвращаются параметры подключения к конкретному кластеру Apache Kafka. При изменении параметров подключения в Event Discovery необходимо перезапустить обработчик.

Параметры, получаемые из Event Discovery:

  1. bootstrap.servers — адрес серверов кластера Кафки;

  2. topic — имя топика.

Источник события с Event Discovery#

source: {
    name: "source_ed"
    type: "eventDiscoverySource"
    uri: "fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1"
    config: {}
    destination: {}
  }