如何在Apache Kafka中处理AVRO序列化的嵌套源数据?



我的目标是从HTTP源获取JSON数据,并使用AVRO序列化将其存储在Kafka主题中。

使用Kafka Connect和HTTP源连接器以及一堆smt,我设法创建了一个连接数据结构,当使用StringConverter写入主题时看起来像这样:

=

结构{基地站,鳕鱼= 200,coord = Struct {lat = 54.0,朗= 9.0},dt = 1632150605}

因此,JSON被成功地解析为struct,我可以使用smt操作单个元素。接下来,我在Confluent schema Registry中创建了一个具有相应模式的新主题,并将连接器的值转换器切换到具有"value.converter": "io.confluent.connect.avro.AvroConverter"的Confluent AVRO转换器。

而不是预期的序列化,我得到一个错误消息说:

org.apache.kafka. mon.errors. serializationexception:序列化Avro消息错误由于:org.apache.avro.SchemaParseException:无法重新定义:io.confluent.connect.avro.ConnectDefault

只要我用ReplaceField删除嵌套的STRUCT或用Flatten简化结构,AVRO序列化就会像魅力一样工作。因此,看起来转换器不能处理嵌套结构。

当你有嵌套元素并希望它们被序列化时,而不是将JSON存储为字符串并试图在消费者或其他地方处理对象创建时,正确的方法是什么?这在Kafka Connect中是可能的吗?

从JSON String创建STRUCT元素可以通过不同的方式实现。最初,使用SMT ExpandJson是为了简单。但是,它没有创建充分命名的struct,因为它没有模式可以使用。这就是AVRO序列化器为这些struct使用泛型类io.confluent.connect.avro.ConnectDefault时导致初始错误消息的原因,如果有多个struct,就会产生歧义,从而抛出异常。

另一个看起来做同样事情的SMT是Json Schema,它有一个文档化的FromJson转换。它确实接受模式,从而解决了ExpandJson将嵌套元素解析为泛型类型的问题。不过,接受的是JSON模式,并且通过使用单词"properties"来映射到AVRO的全名。作为命名空间并复制字段名。在本例中,您最终将使用properties.coord作为内部元素的全名。

作为一个例子,当以下JSON Schema被传递给SMT时:

{
"$schema": "http://json-schema.org/draft-04/schema#",
"type": "object",
"properties": {
"coord": {
"type": "object",
"properties": {
"lon": {
"type": "number"
},
"lat": {
"type": "number"
}
},
"required": [
"lon",
"lat"
]
},
...
}

它生成的AVRO模式(因此在模式注册表中查找)是:

{
"type": "record",
"fields": [
...
{
"name": "coord",
"type": {
"type": "record",
"name": "coord",
"namespace": "properties",
"fields": [
{
"name": "lat",
"type": "double"
},
{
"name": "lon",
"type": "double"
}
],
"connect.name": "properties.coord"
}
},
...
}

理论上,如果在第二层有另一个具有coord元素的模式,它将获得相同的全名,但由于这些不是需要引用的schema Registry中的单个条目,因此不会导致冲突。不能从JSON模式中控制AVRO记录的名称空间有点遗憾,因为感觉您就在那里,但是我还没有能够深入挖掘以提供解决方案。

建议的SMT SetSchemaMetadata(请参阅对该问题的第一个答复)在此过程中可能很有用,但是它的文档与AVRO命名约定有一点冲突,因为它在示例中显示了order-value。它将尝试查找包含以该名称作为根元素的AVRO记录的模式,并且由于'-'在AVRO名称中是非法字符,因此会得到一个错误。但是,如果您使用了根元素的正确名称,SMT会做一些非常有用的事情:它的RestService类(查询Schema Registry以查找匹配的模式)会失败,并显示一条打印出需要创建的确切模式定义的消息,因此您不必记住所有的转换规则。

因此,原始问题的答案是:是的,它可以用Kafka Connect完成。这也是最好的方式,如果你

  • 不想写自己的生产者/连接器
  • 希望以一种类型的方式存储JSON blob,而不是在它们到达初始主题后转换它们

如果可以在数据摄取后进行转换,那么ksqlDB的反、重和序列化功能似乎相当强大。

最新更新