Бекапирование топика в S3#

Поддерживается только SELECT * запрос

Коннектор будет корректно записывать все поля из Kafka в том виде, в котором они есть.

Класс коннектора#

io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector

Поддержка KCQL#

Можно указать несколько операторов KCQL, разделенных «;» для работы с несколькими топиками. Для свойств соединителя topics или topics.regex задайте значение, соответствующее операторам KCQL.

В KCQL выражении задается сопоставление бекапируемого топика (kafka-topic) с целевым бакетом (bucketAddress) и не обязательным префиксом пути (pathPrefix) в S3. Формат KCQL-синтаксиса для sink-коннектора:

INSERT INTO bucketAddress[:pathPrefix]
SELECT *
FROM kafka-topic
[[PARTITIONBY (partition[, partition] ...)] | NOPARTITION]
[STOREAS storage_format]
[PROPERTIES(
  'property.1'=x,
  'property.2'=x,
)]

Экранирование доступно для операторов INSERT INTO, SELECT * FROM и PARTITIONBY. Например, во входящем сообщении Kafka, сохраненном в формате JSON, могут использоваться поля, содержащие «.»:

{
  ...
  "a.b": "value",
  ...
}

В этом случае используйте следующий оператор KCQL:

INSERT INTO `bucket-name`:`prefix` SELECT * FROM `kafka-topic` PARTITIONBY `a.b`

Целевое хранилище#

Целевое S3 хранилище указываются в команде INSERT INTO. Если путь не указан, коннектор будет записывать данные в корень корзины и добавлять к пути имя темы.

Вот несколько примеров:

INSERT INTO testbucket:pathToWriteTo SELECT * FROM topicA;
INSERT INTO testbucket SELECT * FROM topicA;
INSERT INTO testbucket:path/To/Write/To SELECT * FROM topicA PARTITIONBY fieldA;

Исходный топик#

Исходный топик определяется в команде FROM. Чтобы избежать ошибок во время выполнения, найстройте свойство topics или topics.regex в коннекторе и обеспечите правильное сопоставление с операторами KCQL.

Конструкция FROM * автоматически сопоставляет топик с партицией.

Пример конфигурации#

name=local-s3-sink
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
topics=test

connect.s3.custom.endpoint={{ ENDPOINT }}
connect.s3.vhost.bucket=true
connect.s3.aws.auth.mode=Credentials
connect.s3.aws.region=eu-west-2
connect.s3.aws.access.key=none
connect.s3.aws.secret.key=none
connect.s3.kcql=insert into test select * from test STOREAS `TEXT` PROPERTIES ('flush.count'=1)