在集成测试中,在flink迷你集群上测试流时遇到了一个问题。流映射生成的Avro SpecificRecord Pojo类(Java(。
流作业是用Scala编写的。
flink运行时崩溃,因为它无法实例化org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
这是堆栈跟踪:
stack: java.lang.ClassCastException: class org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils
java.lang.RuntimeException: Could not instantiate org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:53)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:572)
我认为问题是Flink无法序列化Avro Pojo类,因为该类中有多个嵌套的Avro Poho类。
我试图为所有嵌套的Pojo类类型添加所有类型信息,但仍然遇到了相同的问题。
所以现在我想知道是否有人用嵌套的Avro Pojo类生成了一个Flink Job。所有类都继承了SpecificRecord类型,并且是从avro模式生成的。
是否有某种特殊的序列化程序需要编写?对于这样一个处理Scala或Java中多个嵌套Pojo类的Serializer,有什么文档或例子吗?
或者这是一个完全不同的问题?
非常感谢您的帮助!
如果flink-avro
不在类路径中,则可能会出现此问题。如果你无论如何都在使用Avro,我会完全禁用Kryo,以捕捉更细微的错误。
我通过在流程函数内部进行解析来实现它。
我必须将一个字符串解析为json,然后解析为SpecificRecord类的一个特定字段的Record类,该字段应该最终在DataLink中。
json的解析现在在另一个ProcessFuncton中实现,现在它可以工作了。在我将映射中的解析直接应用于数据流之前。