Бекапирование топика в S3#
Поддерживается только SELECT * запрос
Коннектор будет корректно записывать все поля из Kafka в том виде, в котором они есть.
Класс коннектора#
io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
Поддержка KCQL#
Можно указать несколько операторов KCQL, разделенных «;» для работы с несколькими топиками. Для свойств соединителя topics или topics.regex задайте значение, соответствующее операторам KCQL.
В KCQL выражении задается сопоставление бекапируемого топика (kafka-topic) с целевым бакетом (bucketAddress) и не обязательным префиксом пути (pathPrefix) в S3. Формат KCQL-синтаксиса для sink-коннектора:
INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
'property.1'=x,
'property.2'=x,
)]
Экранирование доступно для операторов INSERT INTO, SELECT * FROM и PARTITIONBY. Например, во входящем сообщении Kafka, сохраненном в формате JSON, могут использоваться поля, содержащие «.»:
{
...
"a.b": "value",
...
}
В этом случае используйте следующий оператор KCQL:
INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`
Целевое хранилище#
Целевое S3 хранилище указываются в команде INSERT INTO. Если путь не указан, коннектор будет записывать данные в корень корзины и добавлять к пути имя темы.
Вот несколько примеров:
INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;
Исходный топик#
Исходный топик определяется в команде FROM. Чтобы избежать ошибок во время выполнения, найстройте свойство topics или topics.regex в коннекторе и обеспечите правильное сопоставление с операторами KCQL.
Конструкция FROM * автоматически сопоставляет топик с партицией.
Пример конфигурации#
name=local-s3-sink
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=test
connect.s3.custom.endpoint={{ ENDPOINT }}
connect.s3.vhost.bucket=true
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=none
connect.s3.aws.secret.key=none
connect.s3.kcql=insert into test select * from test STOREAS `TEXT` PROPERTIES ('flush.count'=1)