我使用独立方法手动安装了Confluent Kafka Connect S3,而不是通过Confluent的进程或作为整个平台的一部分。
我可以使用以下命令从命令行成功启动连接器:
./kafka_2.11-2.1.0/bin/connect-standalone.sh connect.properties s3-sink.properties
可以看到来自 AWS MSK 的主题 CDC 偏移量正在被使用。不会引发任何错误。但是,在 AWS S3 中,不会为新数据创建文件夹结构,也不会存储 JSON 数据。
问题
- 连接器是否应动态创建文件夹结构 看到主题的第一个 JSON 数据包?
- 除了配置 awscli credentials、connect.properties 和 s3-sink.properties 是 还有任何其他设置需要设置才能正确连接到 S3 存储桶?
- 有关安装文档的建议 更多 比 Confluent 网站上的独立文档更全面?(上面链接)
连接属性
引导服务器=已编辑:9092,已编辑:9092,已编辑:9092
plugin.path=/plugins/kafka-connect-s3 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets
S3-sink.properties
name=s3-sink connector.class=io.confluent.connect.s3.S3SinkConnector 任务.max=1 主题=database_schema_topic1,database_schema_topic2,database_schema_topic3 s3.region=us-east-2 s3.bucket.name=databasekafka s3.part.size=5242880 flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner schema.compatibility = NONE
连接器是否应该在看到主题的第一个 JSON 数据包时动态创建文件夹结构?是的,即使您使用参数"topics.dir"和"path.format"控制此路径(目录结构)
除了配置 awscli 凭证之外,connect.properties 和 s3-sink.properties 是否需要设置任何其他设置才能正确连接到 S3 存储桶?默认情况下,S3 连接器将通过环境变量或凭证文件使用 Aws 凭证(访问 ID 和私有密钥)。 您可以通过修改参数"s3.credentials.provider.class"进行更改。参数的默认值为"DefaultAWSCredentialsProviderChain">
关于安装文档的建议比 Confluent 网站上的独立文档更全面?(上面链接)我建议您使用分布式模式,因为它为连接群集和在其上运行的连接器提供了高可用性。 可以通过以下文档在分布式模式下配置连接群集。 https://docs.confluent.io/current/connect/userguide.html#connect-userguide-dist-worker-config