我在Apache Flink中有以下代码。当它在远程集群上运行时,它在本地集群中运行良好。在包含命令"stack.prush(recordPair);"的行中生成NullPointerException错误。
有人知道吗,原因是什么?
本地集群和远程集群的输入数据集是相同的。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair ;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map ;
private static Stack<Tuple2< Integer,Integer>> stack ;
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
stack = new Stack<Tuple2< Integer,Integer>>();
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
问题是在TC
类的构造函数中初始化stack
变量。这只为运行客户端程序的JVM初始化静态变量。对于本地执行,这是有效的,因为Flink作业是在同一JVM中执行的。
当您在集群上运行它时,TC
将被序列化并运送到集群节点。在那里,实例的反序列化不会再次调用构造函数来初始化stack
。为了实现这一点,您应该将初始化逻辑移动到RichFlatMapFunction
的open
方法,或者使用静态初始化器。但请注意,在同一个TaskManager
上运行的所有运算符都将共享stack
的同一实例,因为它是一个类变量。
public static class TC extends RichFlatMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static TreeSet<Tuple2<Integer, Integer>> treeSet_duplicate_pair;
private static HashMap< Integer, Set<Integer>> clusters_duplicate_map;
// either use a static initializer
private static Stack<Tuple2< Integer,Integer>> stack = new Stack<Tuple2< Integer,Integer>>();
public TC(List<Tuple2<Integer, Integer>> duplicatsPairs) {
...
}
@Override
public void open(Configuration config) {
// or initialize stack here, but here you have to synchronize the initialization
...
}
@Override
public void flatMap(Tuple2<Integer, Integer> recordPair, Collector<Tuple2<Integer, Integer>> out) throws Exception {
if (recordPair!= null)
{
stack.push(recordPair);
...
}
}
}