我在 Kafka 主题中有数据,我想将其保留在我的数据湖中。
在担心密钥之前,我能够使用 HdfsSinkConnector 将 Avro 值保存在数据湖上的文件中。每个文件中的消息值数由 HdfsSinkConnector 的"flush.size"属性确定。
都很好。接下来,我也想保留钥匙。为此,我使用了 kafka-connect-transform-archive,它将字符串键和 Avro 值包装到一个新的 Avro 模式中。
这很好用...除了 HdfsSinkConnector 的 flush.size 现在被忽略了。保存在数据湖中的每个文件只有 1 条消息。
因此,这两种情况是 1) 仅保存值,每个文件中的值数由 flush.size 确定,以及 2) 保存键和值,每个文件只包含一个消息,并且忽略 flush.size。
这两种情况之间的唯一区别是指定存档转换的 HdfsSinkConnector 的配置。
"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"
kafka-connect-transform-archive 是否在设计上忽略了刷新大小,或者我是否需要一些额外的配置才能在数据湖上为每个文件保存多个键值消息?
我在使用 Kafka GCS 接收器连接器时遇到了同样的问题。
在com.github.jcustenborder.kafka.connect.archive.Archive代码中,每条消息都会创建一个新的架构。
private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.archive.Storage")
.field("key", r.keySchema())
.field("value", r.valueSchema())
.field("topic", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
.put("key", r.key())
.put("value", r.value())
.put("topic", r.topic())
.put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());
}
如果您查看 kafka transform InsertField$Value 方法,您会发现它使用 SyncdCache 以便每次都检索相同的模式。
https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L170
因此,您只需要创建一个架构(在应用函数之外)或使用相同的同步缓存代码。