Работа с базами данных#

В шагах Трансформация событий и Агрегация событий предусмотрена работа с использованием БД.

Значение поля database представляет собой объект с полями:

  1. name — имя;

  2. config — конфигурация для подключения к базе.

Если присутствуют свойства, содержащие чувствительную информацию, их следует передавать в зашифрованном виде. Зашифрованные значения в конфигурации начинаются с префикса enc:.

Пример конфигурации будет приведен ниже.

  • Использование kafka:

    • type — со значением kafka;

    • compactTopic — имя топика, используемого в качестве справочника;

    • cacheType — тип хранения справочника, возможные значения default, file, heap, memorydirect;

    • cachePath — путь до файла, используемого при cacheType равном file;

    • secret — ключ для расшифровки паролей к хранилищам сертификатов, параметры ssl.key.password, ssl.keystore.password, ssl.truststore.password;

    • consumer — настройки получателя сообщений;

    • producer — настройки отправителя сообщений; Другие настройки Kafka, такие, как bootstrap.servers и т.д.

  • Использование различных БД, на основе подключения к ним через jdbc connection url:

    • type — со значением database;

    • url — jdbc connection url. Если в url присутствуют свойства, значения которых зашифрованы, то дешифроваться будут только свойства с именами password,pass без учета регистра написания;

    • driver — имя класса jdbc-драйвера;

    • secret — секрет для расшифровки паролей. Опциональная настройка (если присутствуют зашифрованные свойства);

    • vaultSecretPath — путь до key-value хранилища, в котором лежат секреты, необходимые для подключения к БД. Опциональная настройка (если необходимы секреты из Vault);

    • props — настройки, необходимые для создания соединения с базой данных;

    • timeout — тайм-аут обращения к БД в секундах, по умолчанию 5;

    • retryCount — количество попыток пересоздания соединения к БД, по умолчанию 5;

    • retryInterval — интервал в миллисекундах между попытками подключения, по умолчанию 1000;

    • retryMultiplier — множитель увеличения интервала для следующей попытки, по умолчанию 1;

    • typeCasting - позволяет приводить результат к типу, полученному из запроса, по-умолчанию false. Приведение типов выполняется для результатов запроса, содержащих данные типа XML или JSON, результаты запроса, содержащие другие типы данных, приводятся к строке.

Если присутствуют свойства, значения которых зашифрованы, то дешифроваться будут только свойства с именами — password, pass, auth.secret.id, auth.role.id без учета регистра написания.

В classpath доступны следующие jdbc-драйверы: org.postgresql.Driver, com.mysql.cj.jdbc.Driver, oracle.jdbc.driver.OracleDriver, org.apache.ignite.IgniteJdbcThinDriver.

Пример конфигурации приведен ниже.

Для работы с kafka используются функции:

  • getFromKafka(«key») — получение значения с ключом «key»;

  • getJsonFromKafkaCache() — получение всех значений кеша;

  • containsInKafka(«key») — проверка на наличие значения «key» в справочнике: true - есть, false - нет;

  • putInKafka(«key», «value») — добавление значения «value» с ключом «key» в справочник;

  • putJsonInKafka($value) — $value воспринимается, как JSON-объект, поля которого и являются парами ключ:значение;

  • removeFromKafka(«key») — удаление значения с ключом «key» из справочника.

Пример использования kafka#

transformStep: {
    name: "transformation-with-kafka-as-db"
    type: "dsl"
    format: {
      input: "json"
      output: "json"
    }
    dsl: {
      source: "file"
      path: "Through.tr"
    }
    database: {
      config: {
        type: "kafka"
        bootstrap.servers: "host.docker.internal:9092"
        consumer: {
            group.id: "event-process-flow-group"
            client.id: "event-process-flow-client"
        }
        producer: {
          client.id: "event-process-flow-producer"
        }
        compactTopic: "compactTopic"
        initTimeout: "600000"    
      }
    }

    destination: {}
  }

Также для работы с kafka предусмотрены дополнительно две функции, представлены только в трансформации с type: "dsl":

  • getFromKafkaWithConverter(«key») — позволяет конвертировать сообщение из kafka во внутреннюю структуру сообщения;

  • putInKafkaWithConverter(«key», value) — позволяет перед отправкой в Apache Kafka трансформировать сообщение из внутренней структуры в массив байт.

Пример использования функций#

Для использования этих функций необходимо задать следующую конфигурацию:

transformStep: {
    name: "transformation-with-kafka-as-db"
    type: "dsl"
    format: {
      input: "json"
      output: "json"
    }
    dsl: {
      source: "file"
      path: "Through.tr"
    }
    database: {
      config: {
        type: "kafka"
        bootstrap.servers: "host.docker.internal:9092"
        consumer: {
            group.id: "event-process-flow-group"
            client.id: "event-process-flow-client"
        }
        producer: {
          client.id: "event-process-flow-producer"
        }
        compactTopic: "compactTopic"
        initTimeout: "600000"
        format: {
            type: json - формат сообщения: json, xml, avro
            schema: [{path: "pathToSchema"}] # Путь до схемы сообщения. Если схема указана, то происходит валидация полученного сообщения на соответствие указанной схеме
        }   # В случае использования avro-формата, использование схемы обязательно
      }
    }
    destination: {}
  }

Для работы с типом database используются следующие функции:

  • execPreparedInDatabase(«selectAllFromAccounts») — первый аргумент — ключ запроса из конфигурации, все последующие аргументы, поля, которые будут подставляться в запрос в таком же порядке, как были переданы;

  • putJsonPreparedInDatabase(«table-name», «value») — «value» воспринимается как JSON-объект, поля которого и являются парами ключ:значение, все последующие аргументы, поля, которые будут подставляться в запрос в таком же порядке, как были переданы;

  • getJsonFieldPreparedFromDatabase(«selectTimeFromAccount», in.body.AcctId);

  • getJsonPreparedFromDatabaseTable(«table-name») — для работы с таблицами типа «create table cache ( key VARCHAR(255) PRIMARY KEY, value VARCHAR(255) NOT NULL);» — результат преобразуется в JSON-объект.

Следующие функции возвращают ответ в формате:

{
   "isException": "<true/false>",
   "result": "<результат выполнения функции (заполнено при значении isException - false)>",
   "exceptionMessage": "<сообщение об ошибке (заполнено при значении isException - true)>"
}

Тип подставляемого аргумента будет получен из типа аргумента, то есть: при запросе "insertUsers": "INSERT INTO users (email, password) VALUES (?, ?);" и вызове функции execPreparedInDatabase("insertUsers", "test@email.com", 123i) в запрос будет подставлена первым аргументом строка, вторым — целое число.

Пример использования database#

transformStep: {
  name: "transformation"
  type: "dsl"
  format: {
    input: "json"
    output: "json"
  }
  dsl: {
    path: "script_database.tr"
  }
  database: {
    config: {
      type: database
      url: "jdbc:postgresql://localhost:5432/testDB"
      driver: "org.postgresql.Driver"
      secret: "secretValue"
      vaultSecretPath: "kv1/test_dir"
      props: {
        # настройки подключения к базе данных 
        sslfactory: "ru.sbt.cep.flow.event.process.config.parsing.flow.step.storage.postgresql.PostgresqlSslFactory"
        sslmode: "verify-ca"
        # Для получения параметра из Key-Value хранилища Hashicorp Vault необходимо использовать префикс "vault:" с указанием имени секрета  
        password: "__PLACEHOLDER__"
        key_1: "vault:__PLACEHOLDER__"
        ssl: {
          # Настройки для создания ssl-контекста без использования HashiCorp Vault  
          #keystore: {
          #  location: "pathToKeystore"
          #  password: "__PLACEHOLDER__"
          #}
          #truststore: {
          #  location: "pathToTruststore"
          #  password: "__PLACEHOLDER__"
          #}
          #key.password: "__PLACEHOLDER__"  
          # Настройки подключения к HashiCorp Vault(при необходимости) и использования их для создания ssl-контекста при подключении к базе данных 
          # А также получения других настроек для которых необходимо подключение к Vault
          vault: {
            "address": "VaultHost"
            "auth.type": "APPROLE"
            ... # иные настройки для подключения к HashiCorp Vault
          }
        }
      }
      # Запросы, используемые в функциях с PreparedStatement
      statements: {
         "selectTimeFromAccount": "SELECT time FROM accounts WHERE acctId = ?;"
         "selectAllFromAccounts": "SELECT * FROM accounts;"
         "deleteFromAccounts": "DELETE FROM accounts WHERE acctId = ?;"
         "mergeIntoAccounts": "MERGE INTO accounts VALUES (?, ?);"
      }
      timeout: "100"
      retryCount: "5"
      retryInterval: "500"
      retryMultiplier: "2"
      typeCasting: true
    }
  }
  destination: ${destination}
}

Пример подключения к Apache Ignite#

transformStep: {
  name: "transformation"
  type: "dsl"
  format: {
    input: "json"
    output: "json"
  }
  dsl: {
    path: "testDsl/database/script_database.tr"
  }
  database: {
    config: {
      type: database
      url: "jdbc:ignite:thin://localhost:10800/PUBLIC"
      driver: "org.apache.ignite.IgniteJdbcThinDriver"
      secret: "secretValue"
      vaultSecretPath: "kv1/test_dir"
      props: {
        # настройки подключения к базе данных 
        sslEnabled: true
        sslFactory: "ru.sbt.cep.flow.event.process.config.parsing.flow.step.storage.ignite.IgniteSslFactory"
        // Для получения параметра из Key-Value хранилища Hashicorp Vault необходимо использовать префикс "vault:" с указанием имени секрета  
        password: "vault:dbPass"
        key_1: "vault:aliasForKey"
        ssl: {
          # Настройки для создания ssl-контекста без использования HashiCorp Vault  
          #keystore: {
          #  location: "pathToKeystore"
          #  password: "passValue"
          #}
          #truststore: {
          #  location: "pathToTruststore"
          #  password: "passValue"
          #}
          #key.password: "passValue"  
          # Настройки подключения к HashiCorp Vault(при необходимости) и использования их для создания ssl-контекста при подключении к базе данных 
          # А также получения других настроек для которых необходимо подключение к Vault
          vault: {
            "address": "VaultHost"
            "auth.type": "APPROLE"
            ... иные настройки для подключения к HashiCorp Vault
          }
        }
      }
      # Запросы, используемые в функциях с PreparedStatement
      statements: {
         "selectTimeFromAccount": "SELECT time FROM accounts WHERE acctId = ?;"
         "selectAllFromAccounts": "SELECT * FROM accounts;"
         "deleteFromAccounts": "DELETE FROM accounts WHERE acctId = ?;"
         "mergeIntoAccounts": "MERGE INTO accounts VALUES (?, ?);"
      }
      timeout: "100"
      retryCount: "5"
      retryInterval: "500"
      retryMultiplier: "2"
      typeCasting: true
    }
  }
  destination: ${destination}
}