Kafka connect S3源连接器忽略键



我在AWS S3中有以下文件,由Kafka connect sink连接器备份:

test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json

当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息:

DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)

我的源配置是这样的:

"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy

我应该做些什么来让密钥和消息一起存储在Kafka中?

我忽略了如何确保在从S3读取数据时考虑.keys.json来构建Kafka密钥。

我用来实现同样事情的一个技巧是简单地依赖ValueToKeySMT变压器:如果键值也是值有效负载的一部分,那么将其注入键就像在源连接器配置中添加org.apache.kafka.connect.transforms.ValueToKey变压器的实例一样简单。

以下是相关文档:https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html

这里有一篇关于这个变压器的很棒的博客文章(12篇同样很棒的系列文章的一部分)https://rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/

在我的具体情况下,ValueToKey的内置行为并没有做我想要的,所以我最终编写了我自己的CustomKeyToValue转换器,由我的接收器连接器使用,以确保密钥存储在S3中,然后在源连接器上使用CustomValueToKey将东西重建到Kafka中。

编写这样的自定义转换器实际上非常容易,它本质上只是在一个java文件中包含20行代码,然后可以使用kafka-connect-maven-plugin进行打包。下面是内置ValueToKey的示例代码,可以作为灵感:

https://github.com/apache/kafka/blob/28f013708ffe8e48e46f408c7f570bf2cd5c54b2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java/java/org/apache/kafka/connect/transforms/ValueToKey.java

我们也可以应用同样的原则来保存/检索Kafka头文件。

最新更新