Описание источников событий#

Поддерживаются источники событий 4 видов:

  • Topic Apache Kafka;

  • Подключение к Apache Kafka через Event Discovery;

  • Чтение из файла (обработчик завершит работу после того, как будет достигнут конец файла);

  • Мониторинг директории с файлами в файловой системе.

При файловом взаимодействии каждая строка в файле считается отдельным событием

Описание источника событий#

Источником является объект конфигурации с полем «type», содержащим значение source со следующими полями:

  1. name — название источника событий;

  2. topic — имя topic для чтения сообщений;

  3. config — конфигурация транспорта источника;

  4. destination — описание следующего шага обработки события после источника.

Поле config содержит специфическую конфигурацию транспорта:

Apache Kafka:

  1. type — тип транспорта, значение: «kafka»;

  2. secret — ключ для расшифровки паролей к хранилищам сертификатов, параметры «ssl.key.password», «ssl.keystore.password», «ssl.truststore.password»;

  3. consumer — настройки получателя сообщений;

  4. producer — настройки отправителя сообщений;

  5. Другие настройки Kafka как «bootstrap.servers» и т.д.;

  6. flinkSslConfigPath — необязательный параметр с путём к файлу конфигурации Flink (flink-conf.yaml), из которого необходимо получить ssl-контекст.

Файл (обработчик завершит работу после того, как будет достигнут конец файла, рекомендуется использовать с параллелизмом 1):

  1. type — тип транспорта, значение: «file»;

  2. path — полный путь до файла или директории с файлами.

Мониторинг директории с файлами в файловой системе (обработчик периодически отслеживает изменения в файлах)

  1. type — тип транспорта, значение: «file-stream»;

  2. path — полный путь до файла или директории с файлами;

  3. interval — интервал проверки изменений в файле/директории в мс.

Пример#

source: {
      name: "kafka"
      type: "source"
      topic: "input"
      destination: {}
      config: {
        type: "kafka"
        "bootstrap.servers":"localhost:9092"
        consumer: {
            "group.id": "event-process-flow-group"
            "client.id": "event-process-flow-client"
            }
        producer: {
            "client.id": "event-process-flow-producer"
            }
        }
    }
source: {
      name: "file"
      type: "source"
      topic: "input"
      destination: {}
      config: {
        type: "file"
        path: "/path/to/file.txt"
      }
    }
source: {
      name: "file-stream"
      type: "source"
      topic: "input"
      destination: {}
      config: {
        type: "file-stream"
        path: "/path/to/directory/"
        interval: "5000"
      }
    }

Описание источника событий с подключением через сервис Event Discovery#

Источником является объект конфигурации с полем type, содержащим значение «eventDiscoverySource»со следующими полями:

  1. name — название источника событий;

  2. uri — идентификатор события в Event Discovery, например: fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1;

  3. config — конфигурация транспорта источника;

  4. destination — описание следующего шага обработки события после источника.

При старте обработчика происходит запрос в Event Discovery, откуда по имени и версии события возвращаются параметры подключения к конкретному кластеру Apache Kafka. При изменении параметров подключения в Event Discovery необходимо перезапустить обработчик. Параметры, получаемые из Event Discovery:

  1. bootstrap.servers — адрес серверов кластера Kafka;

  2. topic — имя topic.

Пример:

source: {
    name: "source_ed"
    type: "eventDiscoverySource"
    uri: "fpss://Event_flow_test_Domain1.Common/COD/TESTFLOWINPUT/1"
    config: {}
    destination: {}
  }