从对象构建新的源记录



我正在编写一个 Kafka 连接器,以便从 Github 上的多个来源(文本和 yaml 文件(下载一些数据,并将它们转换为某个类的对象,该对象是从 avsc 文件自动生成的:

{
  "type": "record",
  "name": "MatomoRecord",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "type", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}

到目前为止,一切都很成功。所以现在我有一个对象映射,我想把它保留在 Kafka 主题中。为此,我正在尝试创建源记录:

for (Map.Entry<String, MatomoRecord> record : records.entrySet()) {
  sourceRecords.add(new SourceRecord(
    sourcePartition,
    sourceOffset,
    matomoTopic,
    0,
    org.apache.kafka.connect.data.Schema.STRING_SCHEMA,
    record.getKey(),
    matomoSchema,
    record.getValue())
  );
}

如何基于 avro 模式定义 org.apache.kafka.connect.data.schema 类型的值架构?对于测试,我使用构建器手动创建了一个架构:

Schema matomoSchema = SchemaBuilder.struct()
                .name("MatomoRecord")
                .field("name", Schema.STRING_SCHEMA)
                .field("type", Schema.STRING_SCHEMA)
                .field("timestamp", Schema.INT64_SCHEMA)
                .build();

结果是:

org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class MatomoRecord

sombody 可以帮助我基于 avro 模式定义值模式吗?

此致敬意马丁

你不能

使用record.getValue(),也没有来自Avro的直接API来连接模式(没有Confluent的AvroConverter的内部方法(

您需要将该对象解析为与您定义的架构匹配的Struct对象(假设您的任何对象字段都不能为 null,这看起来不错(

查看Javadoc了解如何定义它 https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html

注意(在这里无关紧要(,嵌套结构应该从"自下而上"构建,您将子结构/数组put到父结构/数组中。

连接器不一定依赖于 Avro,只能包含模型对象。转换器接口负责将带有架构的结构转换为其他数据格式(JSON,Confluent的Avro编码,Protobuf等(

KC Schema 是一个 JSON 模式,看起来非常像 Avro 模式。尝试org.apache.kafka.connect.json.JsonConverter#asConnectSchema - 您可能需要按摩 Avro 模式才能使其正常工作。

相关内容

  • 没有找到相关文章

最新更新