Описание источников событий#
Поддерживаются источники событий 6 видов:
Топик Apache Kafka;
Подключение к Apache Kafka через Event Discovery;
Очередь IBM MQ;
Чтение из файла (обработчик завершит работу после того, как будет достигнут конец файла);
Мониторинг директории с файлами в файловой системе;
Подключение Active MQ Artemis.
При файловом взаимодействии каждая строка в файле считается отдельным событием.
Описание источника событий#
Источником является объект конфигурации с полем *type* содержащим значение *source*, также содержит следующие поля:
name— название источника событий;topic— имя топика для чтения сообщений;config— конфигурация транспорта источника;destination— описание следующего шага обработки события после источника.
Поле *config* содержит специфическую конфигурацию транспорта:
Apache Kafka:#
type— тип транспорта, значение: «kafka»;secret— ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.key.password», «ssl.keystore.password», «ssl.truststore.password»;consumer— настройки получателя сообщений;producer— настройки отправителя сообщений;Другие настройки Kafka как «bootstrap.servers» и т.д.;
flinkSslConfigPath— необязательный параметр с путем к файлу конфигурации flink-conf.yaml, из которого необходимо получить ssl-контекст.
IBM MQ:#
type— тип транспорта, значение: «ibm_mq»;secret— ключ для расшифровки пароля пользователя, параметра «password», опционально;host— host для подключения;port— порт для подключения;channel— имя канала для подключения;queueManager— имя менеджера очередей;user— имя пользователя для подключения, опционально;password— пароль пользователя для подключения, опционально;ssl.enabled— включение/отключение защищенного протокола, опционально;ssl.keystore.location— путь до хранилища сертификатов, опционально;ssl.keustore.password— пароль от хранилища сертификатов, опционально;ssl.truststore.location— путь до хранилища доверенных сертификатов, опционально;ssl.truststore.password— пароль от хранилища доверенных сертификатов, опционально;ssl.secret— ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.store.password», «ssl.truststore.password»;receiveConversionClient— значение настройкиWMQ_RECEIVE_CONVERSION: true —WMQ_RECEIVE_CONVERSION_CLIENT_MSG, false —WMQ_RECEIVE_CONVERSION_QMGR, по умолчаниюfalse;headers— список заголовков для передачи в трансформацию;charset— кодировка входящего сообщения;sslPeerName— DN сертификата менеджера очередей, опционально;sslCipherSuite— шифр подключения к менеджеру, опционально;retryCount— количество попыток переподключения, по умолчанию3;retryInterval— начальное время ожидания между попытками восстановления соединения в миллисекундах, по умолчанию1000;retryMultiplier— множитель интервала со второй попытке подключения, по умолчанию2;useIBMCipherMappings— использовать таблицу преобразования шифров IBM, по умолчаниюfalse;mq— дополнительные параметры фабрики подключений.
Файл (обработчик завершит работу после того, как будет достигнут конец файла, рекомендуется использовать с параллелизмом 1):
type— тип транспорта, значение: «file»;path— полный путь до файла или директории с файлами.
Мониторинг директории с файлами в файловой системе (обработчик периодически отслеживает изменения в файлах):
type— тип транспорта, значение: «file-stream»;path— полный путь до файла или директории с файлами;interval— интервал проверки изменений в файле/директории в мс.
Active MQ Artemis#
Адрес и имя очереди, из которой необходимо вычитывать сообщения, указываются в поле topic
конфигурации источника событий через ::. Например, необходимо вычитывать из очереди queue1
по адресу test-input, тогда в поле topic необходимо указать test-input::queue1.
В случае, если имя очереди совпадает с адресом, то в данном поле можно указать только адрес. Например,
необходимо вычитывать из очереди test с адресом test, тогда в поле topic необходимо указать test.
type- тип транспорта, значение: "artemis_mq".connectors- список брокеров для подключения, в формате host:port через запятую.secret- ключ для шифрования паролей, не обязательный параметр (если необходимо шифрование паролей, то необходимо указать).charset- кодировка сообщений, по умолчаниюUTF-8.connectorConfigs- ssl-конфигурация брокера для подключения (в случае использования HashiCorp Vault здесь также указываются настройки подключения к HashiCorp Vault, Настройки выпуска или получения сертификатов из HashiCorp Vault):sslEnabledвключение/отключение SSL, по умолчаниюfalse(в случае использования HashiCorp Vault данную настройку можно не указывать, будет использоваться значениеtrue);keyStorePathпуть до хранилища приватного ключа;keyStorePasswordпароль от хранилища приватного ключа;trustStorePathпуть до хранилища доверенных сертификатов;trustStorePasswordпароль от хранилища доверенных сертификатов;verifyHostвключение/отключение проверки имени хоста клиента и CN сертификата клиента, по умолчаниюtrue.
factoryпараметры кorg.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory:blockOnqueueсинхронная отправка сообщений в очередь, по умолчаниюfalse;loadBalancingPolicyClassNameимя балансировщика нагрузки, по умолчаниюorg.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy(первым подключением выбирается случайное из списка). В дистрибутиве присутствует альтернативный балансировщикru.sbt.flink.streaming.connectors.jms.artemismq.utils.RoundRobinLoadBalancingPolicy, который всегда начинает с первого подключения;reconnectAttemptsколичество повторных попыток подключения, по умолчанию0;recoveryIntervalинтервал между попытками восстановления соединения, по умолчанию2000 мс;recoveryIntervalMultiplierмножитель, на который будет умножаться каждый следующий интервал между переподключениями, по умолчанию1(между попытками переподключениями будет один и тот же интервал). Введенное значение округляется до целого;confirmationWindowSizeразмер буфера команд, переданных от клиента серверу, по умолчанию-1(буферизация отключена);callTimeoutвремя при отправке через кластерное соединение, по умолчанию30000 мс.
receiveMode- тип подписки - может принимать значенияsubscribeилиpoll(синхронный или асинхронный режим приема сообщений), по умолчаниюsubscribe.receiveTimeout- время ожидания получения сообщения, по умолчанию0 мс(время ожидания не ограничено).
Источники событий#
source: {
name: "kafka"
type: "source"
topic: "input"
destination: {}
config: {
type: "kafka"
"bootstrap.servers":"localhost:9092"
consumer: {
"group.id": "event-process-flow-group"
"client.id": "event-process-flow-client"
}
producer: {
"client.id": "event-process-flow-producer"
}
}
}
source: {
name: "file"
type: "source"
topic: "input"
destination: {}
config: {
type: "file"
path: "/path/to/file.txt"
}
}
source: {
name: "file-stream"
type: "source"
topic: "input"
destination: {}
config: {
type: "file-stream"
path: "/path/to/directory/"
interval: "5000"
}
}
source: {
name: "IBM MQ source"
type: "source"
topic: "TRANSPORT.TO.KAFKA"
config: {
type: "ibm_mq"
host: "broker.host"
port: "1415"
channel: "ANTON_SSL"
queueManager: "M99.CEP.EL1"
secret: "/ssl/encrypt.pass"
ssl: {
enabled: "true"
protocol: "TLSv1.2"
keystore: {
location: "/ssl/app.jks"
password: "tXuQ5VOZSHwXmi0rS0fmqw=="
}
truststore: {
location: "/ssl/app.jks"
password: "fvYQdHnAMaXPuJi1x1OSZQ=="
}
}
charset: "UTF-8"
sslPeerName: "CN=peerName"
sslCipherSuite: "TLS_RSA_WITH_AES_128_CBC_SHA256"
useIBMCipherMappings: "false"
}
destination: {}
}
source: {
name: "Active MQ Artemis source"
type: "source"
topic: "flink-flow-test-input"
config: {
type: "artemis_mq"
connectors: "10.25.68.8:31617, localhost:61616"
user: "test_user"
secret: "secret"
password: "password"
connectorConfigs: {
sslEnabled: "true"
keyStorePath: "keystore-path"
keyStorePassword: "enc:X43sOqEAPIrd6UfD3yw6Qk/f8zjA5Hc44Lpz1SPq86fJMWf4UC/T3MHGP07oroCM"
trustStorePath: "truststore-path"
trustStorePassword: "enc:I+e0v/Tm+Qonb0s9xAAXpnkhuI9nMhqbiMmqm+DAkuYhvYxY/KdG3qNGnBZ+HnHz"
verifyHost: "true"
}
factory: {
blockOnQueue: true
reconnectAttempts: 1
recoveryInterval: 3000
recoveryIntervalMultiplier: 2
confirmationWindowSize: 1
loadBalancingPolicyClassName: "ru.sbt.flink.streaming.connectors.jms.artemismq.utils.RoundRobinLoadBalancingPolicy"
callTimeout: 3000
}
receiveMode: "poll"
receiveTimeout: 3000
}
}
Описание источника событий с подключением через сервис Event Discovery#
Является объект конфигурации с полем type содержащим значение «eventDiscoverySource», также содержит следующие поля:
name— название источника событий;uri— идентификатор события в Event Discovery, например: fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1;config— конфигурация транспорта источника;destination— описание следующего шага обработки события после источника.
При старте обработчика происходит запрос в Event Discovery, откуда по имени и версии события возвращаются параметры подключения к конкретному кластеру Apache Kafka. При изменении параметров подключения в Event Discovery необходимо перезапустить обработчик.
Параметры, получаемые из Event Discovery:
bootstrap.servers— адрес серверов кластера Кафки;topic— имя топика.
Источник события с Event Discovery#
source: {
name: "source_ed"
type: "eventDiscoverySource"
uri: "fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1"
config: {}
destination: {}
}