我在AWS S3中有以下文件,由Kafka connect sink连接器备份:
test_yordan_kafka_connect+0+0000023000.json
test_yordan_kafka_connect+0+0000023000.keys.json
当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息:
DEBUG [source-s3|task-0] Removing test-bucket/topics/test_yordan_kafka_connect/partition=1/test_yordan_kafka_connect+1+0000000000.keys.json from common prefixes. (io.confluent.connect.s3.source.S3Storage:333)
我的源配置是这样的:
"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy
我应该做些什么来让密钥和消息一起存储在Kafka中?
我忽略了如何确保在从S3读取数据时考虑.keys.json
来构建Kafka密钥。
我用来实现同样事情的一个技巧是简单地依赖ValueToKey
SMT变压器:如果键值也是值有效负载的一部分,那么将其注入键就像在源连接器配置中添加org.apache.kafka.connect.transforms.ValueToKey
变压器的实例一样简单。
以下是相关文档:https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
这里有一篇关于这个变压器的很棒的博客文章(12篇同样很棒的系列文章的一部分)https://rmoff.net/2020/12/09/twelve-days-of-smt-day-2-valuetokey-and-extractfield/
在我的具体情况下,ValueToKey
的内置行为并没有做我想要的,所以我最终编写了我自己的CustomKeyToValue
转换器,由我的接收器连接器使用,以确保密钥存储在S3中,然后在源连接器上使用CustomValueToKey
将东西重建到Kafka中。
编写这样的自定义转换器实际上非常容易,它本质上只是在一个java文件中包含20行代码,然后可以使用kafka-connect-maven-plugin
进行打包。下面是内置ValueToKey
的示例代码,可以作为灵感:
https://github.com/apache/kafka/blob/28f013708ffe8e48e46f408c7f570bf2cd5c54b2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java/java/org/apache/kafka/connect/transforms/ValueToKey.java
我们也可以应用同样的原则来保存/检索Kafka头文件。