Описание источников событий#
Поддерживаются источники событий 4 видов:
Topic Apache Kafka;
Подключение к Apache Kafka через Event Discovery;
Чтение из файла (обработчик завершит работу после того, как будет достигнут конец файла);
Мониторинг директории с файлами в файловой системе.
При файловом взаимодействии каждая строка в файле считается отдельным событием
Описание источника событий#
Источником является объект конфигурации с полем «type», содержащим значение source со следующими полями:
name— название источника событий;topic— имя topic для чтения сообщений;config— конфигурация транспорта источника;destination— описание следующего шага обработки события после источника.
Поле config содержит специфическую конфигурацию транспорта:
Apache Kafka:
type— тип транспорта, значение: «kafka»;secret— ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.key.password», «ssl.keystore.password», «ssl.truststore.password»;consumer— настройки получателя сообщений;producer— настройки отправителя сообщений;Другие настройки Kafka как «bootstrap.servers» и т.д.;
flinkSslConfigPath— необязательный параметр с путём к файлу конфигурации Flink (flink-conf.yaml), из которого необходимо получить ssl-контекст.
Файл (обработчик завершит работу после того, как будет достигнут конец файла, рекомендуется использовать с параллелизмом 1):
type— тип транспорта, значение: «file»;path— полный путь до файла или директории с файлами.
Мониторинг директории с файлами в файловой системе (обработчик периодически отслеживает изменения в файлах)
type— тип транспорта, значение: «file-stream»;path— полный путь до файла или директории с файлами;interval— интервал проверки изменений в файле/директории в мс.
Пример#
source: {
name: "kafka"
type: "source"
topic: "input"
destination: {}
config: {
type: "kafka"
"bootstrap.servers":"localhost:9092"
consumer: {
"group.id": "event-process-flow-group"
"client.id": "event-process-flow-client"
}
producer: {
"client.id": "event-process-flow-producer"
}
}
}
source: {
name: "file"
type: "source"
topic: "input"
destination: {}
config: {
type: "file"
path: "/path/to/file.txt"
}
}
source: {
name: "file-stream"
type: "source"
topic: "input"
destination: {}
config: {
type: "file-stream"
path: "/path/to/directory/"
interval: "5000"
}
}
Описание источника событий с подключением через сервис Event Discovery#
Источником является объект конфигурации с полем type, содержащим значение «eventDiscoverySource»со следующими полями:
name— название источника событий;uri— идентификатор события в Event Discovery, например: fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1;config— конфигурация транспорта источника;destination— описание следующего шага обработки события после источника.
При старте обработчика происходит запрос в Event Discovery, откуда по имени и версии события возвращаются параметры подключения к конкретному кластеру Apache Kafka. При изменении параметров подключения в Event Discovery необходимо перезапустить обработчик. Параметры, получаемые из Event Discovery:
bootstrap.servers— адрес серверов кластера Kafka;topic— имя topic.
Пример:
source: {
name: "source_ed"
type: "eventDiscoverySource"
uri: "fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1"
config: {}
destination: {}
}