我正在编写一个 Kafka 连接器,以便从 Github 上的多个来源(文本和 yaml 文件(下载一些数据,并将它们转换为某个类的对象,该对象是从 avsc 文件自动生成的:
{
"type": "record",
"name": "MatomoRecord",
"fields": [
{"name": "name", "type": "string"},
{"name": "type", "type": "string"},
{"name": "timestamp", "type": "long"}
]
}
到目前为止,一切都很成功。所以现在我有一个对象映射,我想把它保留在 Kafka 主题中。为此,我正在尝试创建源记录:
for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
sourceRecords.add(new SourceRecord(
sourcePartition,
sourceOffset,
matomoTopic,
0,
org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
record.getKey(),
matomoSchema,
record.getValue())
);
}
如何基于 avro 模式定义 org.apache.kafka.connect.data.schema 类型的值架构?对于测试,我使用构建器手动创建了一个架构:
Schema matomoSchema = SchemaBuilder.struct()
.name("MatomoRecord")
.field("name", Schema.STRING_SCHEMA)
.field("type", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA)
.build();
结果是:
org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord
sombody 可以帮助我基于 avro 模式定义值模式吗?
此致敬意马丁
使用record.getValue()
,也没有来自Avro的直接API来连接模式(没有Confluent的AvroConverter的内部方法(
您需要将该对象解析为与您定义的架构匹配的Struct
对象(假设您的任何对象字段都不能为 null,这看起来不错(
查看Javadoc了解如何定义它 https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html
注意(在这里无关紧要(,嵌套结构应该从"自下而上"构建,您将子结构/数组put
到父结构/数组中。
连接器不一定依赖于 Avro,只能包含模型对象。转换器接口负责将带有架构的结构转换为其他数据格式(JSON,Confluent的Avro编码,Protobuf等(
KC Schema 是一个 JSON 模式,看起来非常像 Avro 模式。尝试org.apache.kafka.connect.json.JsonConverter#asConnectSchema
- 您可能需要按摩 Avro 模式才能使其正常工作。