访问flatMapToPair内部的HashMap



编辑:已使用RDD.collectAsMap()解决

我正试图复制第28-30页的问题解决方案http://on-demand.gputechconf.com/gtc/2016/presentation/S6424-michela-taufer-apache-spark.pdf

我有一个在map函数之外实例化的HashMap。HashMap包含以下数据:

{1:2, 2:3, 3:2, 4:2, 5:3}

以前定义的RDD以前的RDD具有以下类型:

JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>>

有数据:

1: [(1,2), (1,5)]
2: [(2,1), (2,3), (2,5)]
3: [(3,2), (3,4)]
4: [(4,3), (4,5)]
5: [(5,1), (5,2), (5,4)]

我尝试用flatMapToPair:创建一个新的RDD

JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, Integer, Integer>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> integerIterableTuple2) throws Exception {
Integer count;
ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
count = hashMap.get(integerIterableTuple2._1);
for (Tuple2<Integer, Integer> t : integerIterableTuple2._2) {
Integer tcount = hashMap.get(t._2);
if (count < tcount || (count.equals(tcount) && integerIterableTuple2._1 < t._2)) {
list.add(t);
}
}
return list.iterator();
}
});

但在这种情况下,for循环中的hashMap.get(t._2)在大多数情况下都会获得NULL。我已经检查了HashMap中是否有正确的值。

有没有一种方法可以在Spark函数中正确地获取HashMap的值?

它应该可以工作。Spark应该捕获您的变量,对其进行序列化,并将其与每个任务一起发送给每个工作者。你可以尝试广播这个地图

sc.broadcast(hashMap)

并使用该结果来代替CCD_ 3。它在内存方面也更高效(每个执行器共享存储(。

我在类变量方面也遇到过类似的问题。您可以尝试将您的变量设为本地变量或再声明一个变量,如下所示:

Map localMap = hashMap;
JavaPairRDD<Integer, Integer> newRDD = previousRDD.flatMapToPair(
...
Integer tcount = localMap.get(t._2);
...
);

我认为这是由于火花序列化机制。你可以在这里阅读更多关于它的信息。

最新更新