Работа с базами данных#
В шагах Трансформация событий и Агрегация событий предусмотрена работа с использованием БД.
Значение поля database представляет собой объект с полями:
name— имя;config— конфигурация для подключения к базе.
Если присутствуют свойства, содержащие чувствительную информацию, их следует передавать в зашифрованном виде. Зашифрованные значения в конфигурации начинаются с префикса enc:.
Пример конфигурации будет приведен ниже.
Использование
kafka:type— со значениемkafka;compactTopic— имя топика, используемого в качестве справочника;cacheType— тип хранения справочника, возможные значения default, file, heap, memorydirect;cachePath— путь до файла, используемого приcacheTypeравном file;secret— ключ для расшифровки паролей к хранилищам сертификатов, параметры ssl.key.password, ssl.keystore.password, ssl.truststore.password;consumer— настройки получателя сообщений;producer— настройки отправителя сообщений; Другие настройки Kafka, такие, как bootstrap.servers и т.д.
Использование различных БД, на основе подключения к ним через jdbc connection url:
type— со значениемdatabase;url— jdbc connection url. Если в url присутствуют свойства, значения которых зашифрованы, то дешифроваться будут только свойства с именамиpassword,passбез учета регистра написания;driver— имя класса jdbc-драйвера;secret— секрет для расшифровки паролей. Опциональная настройка (если присутствуют зашифрованные свойства);vaultSecretPath— путь доkey-valueхранилища, в котором лежат секреты, необходимые для подключения к БД. Опциональная настройка (если необходимы секреты из Vault);props— настройки, необходимые для создания соединения с базой данных;timeout— тайм-аут обращения к БД в секундах, по умолчанию 5;retryCount— количество попыток пересоздания соединения к БД, по умолчанию 5;retryInterval— интервал в миллисекундах между попытками подключения, по умолчанию 1000;retryMultiplier— множитель увеличения интервала для следующей попытки, по умолчанию 1;typeCasting- позволяет приводить результат к типу, полученному из запроса, по-умолчанию false. Приведение типов выполняется для результатов запроса, содержащих данные типа XML или JSON, результаты запроса, содержащие другие типы данных, приводятся к строке.
Если присутствуют свойства, значения которых зашифрованы, то дешифроваться будут только свойства с именами — password, pass, auth.secret.id, auth.role.id без учета регистра написания.
В classpath доступны следующие jdbc-драйверы: org.postgresql.Driver, com.mysql.cj.jdbc.Driver, oracle.jdbc.driver.OracleDriver, org.apache.ignite.IgniteJdbcThinDriver.
Пример конфигурации приведен ниже.
Для работы с kafka используются функции:
getFromKafka(«key») — получение значения с ключом «key»;
getJsonFromKafkaCache() — получение всех значений кеша;
containsInKafka(«key») — проверка на наличие значения «key» в справочнике:
true- есть,false- нет;putInKafka(«key», «value») — добавление значения «value» с ключом «key» в справочник;
putJsonInKafka($value) — $value воспринимается, как JSON-объект, поля которого и являются парами
ключ:значение;removeFromKafka(«key») — удаление значения с ключом «key» из справочника.
Пример использования kafka#
transformStep: {
name: "transformation-with-kafka-as-db"
type: "dsl"
format: {
input: "json"
output: "json"
}
dsl: {
source: "file"
path: "Through.tr"
}
database: {
config: {
type: "kafka"
bootstrap.servers: "host.docker.internal:9092"
consumer: {
group.id: "event-process-flow-group"
client.id: "event-process-flow-client"
}
producer: {
client.id: "event-process-flow-producer"
}
compactTopic: "compactTopic"
initTimeout: "600000"
}
}
destination: {}
}
Также для работы с kafka предусмотрены дополнительно две функции, представлены только в трансформации с type: "dsl":
getFromKafkaWithConverter(«key») — позволяет конвертировать сообщение из
kafkaво внутреннюю структуру сообщения;putInKafkaWithConverter(«key», value) — позволяет перед отправкой в Apache Kafka трансформировать сообщение из внутренней структуры в массив байт.
Пример использования функций#
Для использования этих функций необходимо задать следующую конфигурацию:
transformStep: {
name: "transformation-with-kafka-as-db"
type: "dsl"
format: {
input: "json"
output: "json"
}
dsl: {
source: "file"
path: "Through.tr"
}
database: {
config: {
type: "kafka"
bootstrap.servers: "host.docker.internal:9092"
consumer: {
group.id: "event-process-flow-group"
client.id: "event-process-flow-client"
}
producer: {
client.id: "event-process-flow-producer"
}
compactTopic: "compactTopic"
initTimeout: "600000"
format: {
type: json - формат сообщения: json, xml, avro
schema: [{path: "pathToSchema"}] # Путь до схемы сообщения. Если схема указана, то происходит валидация полученного сообщения на соответствие указанной схеме
} # В случае использования avro-формата, использование схемы обязательно
}
}
destination: {}
}
Для работы с типом database используются следующие функции:
execPreparedInDatabase(«selectAllFromAccounts») — первый аргумент — ключ запроса из конфигурации, все последующие аргументы, поля, которые будут подставляться в запрос в таком же порядке, как были переданы;
putJsonPreparedInDatabase(«table-name», «value») — «value» воспринимается как JSON-объект, поля которого и являются парами
ключ:значение, все последующие аргументы, поля, которые будут подставляться в запрос в таком же порядке, как были переданы;getJsonFieldPreparedFromDatabase(«selectTimeFromAccount», in.body.AcctId);
getJsonPreparedFromDatabaseTable(«table-name») — для работы с таблицами типа «create table cache ( key VARCHAR(255) PRIMARY KEY, value VARCHAR(255) NOT NULL);» — результат преобразуется в JSON-объект.
Следующие функции возвращают ответ в формате:
{
"isException": "<true/false>",
"result": "<результат выполнения функции (заполнено при значении isException - false)>",
"exceptionMessage": "<сообщение об ошибке (заполнено при значении isException - true)>"
}
Тип подставляемого аргумента будет получен из типа аргумента, то есть:
при запросе "insertUsers": "INSERT INTO users (email, password) VALUES (?, ?);" и вызове функции execPreparedInDatabase("insertUsers", "test@email.com", 123i) в запрос будет подставлена первым аргументом строка, вторым — целое число.
Пример использования database#
transformStep: {
name: "transformation"
type: "dsl"
format: {
input: "json"
output: "json"
}
dsl: {
path: "script_database.tr"
}
database: {
config: {
type: database
url: "jdbc:postgresql://localhost:5432/testDB"
driver: "org.postgresql.Driver"
secret: "secretValue"
vaultSecretPath: "kv1/test_dir"
props: {
# настройки подключения к базе данных
sslfactory: "ru.sbt.cep.flow.event.process.config.parsing.flow.step.storage.postgresql.PostgresqlSslFactory"
sslmode: "verify-ca"
# Для получения параметра из Key-Value хранилища Hashicorp Vault необходимо использовать префикс "vault:" с указанием имени секрета
password: "__PLACEHOLDER__"
key_1: "vault:__PLACEHOLDER__"
ssl: {
# Настройки для создания ssl-контекста без использования HashiCorp Vault
#keystore: {
# location: "pathToKeystore"
# password: "__PLACEHOLDER__"
#}
#truststore: {
# location: "pathToTruststore"
# password: "__PLACEHOLDER__"
#}
#key.password: "__PLACEHOLDER__"
# Настройки подключения к HashiCorp Vault(при необходимости) и использования их для создания ssl-контекста при подключении к базе данных
# А также получения других настроек для которых необходимо подключение к Vault
vault: {
"address": "VaultHost"
"auth.type": "APPROLE"
... # иные настройки для подключения к HashiCorp Vault
}
}
}
# Запросы, используемые в функциях с PreparedStatement
statements: {
"selectTimeFromAccount": "SELECT time FROM accounts WHERE acctId = ?;"
"selectAllFromAccounts": "SELECT * FROM accounts;"
"deleteFromAccounts": "DELETE FROM accounts WHERE acctId = ?;"
"mergeIntoAccounts": "MERGE INTO accounts VALUES (?, ?);"
}
timeout: "100"
retryCount: "5"
retryInterval: "500"
retryMultiplier: "2"
typeCasting: true
}
}
destination: ${destination}
}
Пример подключения к Apache Ignite#
transformStep: {
name: "transformation"
type: "dsl"
format: {
input: "json"
output: "json"
}
dsl: {
path: "testDsl/database/script_database.tr"
}
database: {
config: {
type: database
url: "jdbc:ignite:thin://localhost:10800/PUBLIC"
driver: "org.apache.ignite.IgniteJdbcThinDriver"
secret: "secretValue"
vaultSecretPath: "kv1/test_dir"
props: {
# настройки подключения к базе данных
sslEnabled: true
sslFactory: "ru.sbt.cep.flow.event.process.config.parsing.flow.step.storage.ignite.IgniteSslFactory"
// Для получения параметра из Key-Value хранилища Hashicorp Vault необходимо использовать префикс "vault:" с указанием имени секрета
password: "vault:dbPass"
key_1: "vault:aliasForKey"
ssl: {
# Настройки для создания ssl-контекста без использования HashiCorp Vault
#keystore: {
# location: "pathToKeystore"
# password: "passValue"
#}
#truststore: {
# location: "pathToTruststore"
# password: "passValue"
#}
#key.password: "passValue"
# Настройки подключения к HashiCorp Vault(при необходимости) и использования их для создания ssl-контекста при подключении к базе данных
# А также получения других настроек для которых необходимо подключение к Vault
vault: {
"address": "VaultHost"
"auth.type": "APPROLE"
... иные настройки для подключения к HashiCorp Vault
}
}
}
# Запросы, используемые в функциях с PreparedStatement
statements: {
"selectTimeFromAccount": "SELECT time FROM accounts WHERE acctId = ?;"
"selectAllFromAccounts": "SELECT * FROM accounts;"
"deleteFromAccounts": "DELETE FROM accounts WHERE acctId = ?;"
"mergeIntoAccounts": "MERGE INTO accounts VALUES (?, ?);"
}
timeout: "100"
retryCount: "5"
retryInterval: "500"
retryMultiplier: "2"
typeCasting: true
}
}
destination: ${destination}
}