在Flink Mini Cluster(1.11)和AvroKryoSerializerUtils上运行流不起作用



在集成测试中,在flink迷你集群上测试流时遇到了一个问题。流映射生成的Avro SpecificRecord Pojo类(Java(。

流作业是用Scala编写的。

flink运行时崩溃,因为它无法实例化org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils

这是堆栈跟踪:

stack: java.lang.ClassCastException: class org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
java.lang.RuntimeException: Could not instantiate org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:53)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:572)

我认为问题是Flink无法序列化Avro Pojo类,因为该类中有多个嵌套的Avro Poho类。

我试图为所有嵌套的Pojo类类型添加所有类型信息,但仍然遇到了相同的问题。

所以现在我想知道是否有人用嵌套的Avro Pojo类生成了一个Flink Job。所有类都继承了SpecificRecord类型,并且是从avro模式生成的。

是否有某种特殊的序列化程序需要编写?对于这样一个处理Scala或Java中多个嵌套Pojo类的Serializer,有什么文档或例子吗?

或者这是一个完全不同的问题?

非常感谢您的帮助!

如果flink-avro不在类路径中,则可能会出现此问题。如果你无论如何都在使用Avro,我会完全禁用Kryo,以捕捉更细微的错误。

我通过在流程函数内部进行解析来实现它。

我必须将一个字符串解析为json,然后解析为SpecificRecord类的一个特定字段的Record类,该字段应该最终在DataLink中。

json的解析现在在另一个ProcessFuncton中实现,现在它可以工作了。在我将映射中的解析直接应用于数据流之前。

相关内容

  • 没有找到相关文章

最新更新