我设置了一个融合的 s3 接收器连接,它将 .avro 文件存储在 s3 中。
我转储这些文件,发现它们只是消息本身,我不知道在哪里可以找到消息密钥,知道吗?
配置如下:
{
"name": "s3-sink-test",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "book",
"s3.region": "eu-central-1",
"s3.bucket.name": "kafka",
"s3.part.size": "5242880",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",
"flush.size": "2",
"schema.compatibility": "NONE"
}
}
编辑 代码已更改以启用此功能,并且不需要以下转换 - docs
开箱即用的 Kafka 连接器不会受到任何存储 Kafka 连接器的保留
尝试编译和设置存档转换,可以使用连接配置中的这些属性进行设置
"transforms" : "Archive",
"transforms.Archive.type" : "com.github.jcustenborder.kafka.connect.archive.Archive"
有关 Kafka Connect 中的 SMT 的更多信息,请参阅此博客文章