我正在使用Scala,在独立机器(装有Windows 10的PC(上用Spark编程。我是一个新手,没有用scala和Spark编程的经验。所以我会非常感谢你的帮助。
问题:
我有一个HashMap,hMap1,其值是HashSets of Integer条目(HashMap>(。然后,我将其值(即许多HashSet值(存储在RDD中。代码如下
val rdd1 = sc.parallelize(Seq(hMap1.values()))
现在我有另一个相同类型的HashMap,hMap2,即HashMap>。其值也存储在RDD中,作为
val rdd2 = sc.parallelize(Seq(hMap2.values()))
我想知道如何将 hMap1 和 hMap2 的值相交
例如:
输入:
rdd1 = [2, 3], [1, 109], [88, 17]
中的数据
和rdd2 = [2, 3], [1, 109], [5,45]
中的数据
输出
所以输出 = [2, 3], [1, 109]
问题陈述
我对你的问题的理解如下:
给定两个
RDD[Set[Integer]]
类型的 RDD,我怎样才能生成它们公共记录的RDD
。
示例数据
生成的两个 RDD
val rdd1 = sc.parallelize(Seq(Set(2, 3), Set(1, 109), Set(88, 17)))
val rdd2 = sc.parallelize(Seq(Set(2, 3), Set(1, 109), Set(5, 45)))
可能的解决方案
如果我对问题陈述的理解是正确的,如果你的RDD和我想象的那样,你可以使用rdd1.intersection(rdd2)
。这是我在Spark 2.2.0的火花壳上尝试的:
rdd1.intersection(rdd2).collect
这产生了输出:
Array(Set(2, 3), Set(1, 109))
这是有效的,因为Spark可以比较类型Set[Integer]
的元素,但请注意,除非您定义了MyObject
的相等协定,否则这不能推广到任何对象Set[MyObject]
。