java.util.List 和 java.util.Map 的 Flink 序列化



My Flink 管道目前使用一个 Pojo,其中包含一些列表和映射(字符串(,如下所示:

public class MyPojo {
private List<String> myList = new ArrayList<>();
private OtherPojo otherPojo = new OtherPojo();
// getters + setters...
}
public class OtherPojo {
private Map<String, String> myMap = new HashMap<>();
// getters + setters...
}

出于性能原因,我想绕过 Kryo 序列化,所以我禁用了env.getConfig().disableGenericTypes();的通用回退,如 Flink 文档中所述。

现在,Flink 抱怨这些列表:

Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:250)
at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
...

在 Flink 中序列化这些简单的列表和映射的首选方法是什么?在内部,这些目前是ArrayListHashMap,但其他实现也可以。Flink 中似乎有一个类org.apache.flink.api.common.typeutils.base.ListSerializer,但我不知道如何使用它。

Marius 已经很好地解释了原因,尽管我不明白 Flink 不支持您的开箱即用用例的原因。不过,我将添加现在有效的解决方案。

// create type info
final TypeInformation<OtherPojo> otherPojoInfo = Types.POJO(OtherPojo.class, 
ImmutableMap.of("myMap", Types.MAP(Types.STRING, Types.STRING)));
final TypeInformation<MyPojo> myPojoInfo = Types.POJO(MyPojo.class,
ImmutableMap.of("myList", Types.LIST(Types.STRING), "otherPojo", otherPojoInfo));
// test it
final MyPojo myPojo = new MyPojo();
myPojo.getMyList().add("test");
myPojo.getOtherPojo().getMyMap().put("ping", "pong");
final TypeSerializer<MyPojo> serializer = myPojoInfo.createSerializer(env.getConfig());
DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(100);
serializer.serialize(myPojo, dataOutputSerializer);
DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(dataOutputSerializer.getSharedBuffer());
final MyPojo clone = serializer.deserialize(dataInputDeserializer);
assert(myPojo.equals(clone));

请注意,测试代码中糟糕的访问模式只是为了快速而肮脏的演示。

如果你这样做:

env.getConfig().disableGenericTypes();

每当遇到通过 Kryo 的数据类型时,它都会引发异常。

因此,在这种情况下,您必须编写自己的序列化程序。 可以使用TypeSerializer创建,只需在TypeInformation对象上调用typeInfo.createSerializer(config)即可。

对于泛型类型,您需要通过 TypeHint"捕获"泛型类型信息,对于列表:

TypeInformation<List<Object>> info = TypeInformation.of(new TypeHint<List<Object>>(){});

列表类型信息类

更多细节在这里。

相关内容

最新更新