自定义条件下合并Spark RDD中的元素?
假设有一个RDD[Seq[Int]],其中这个RDD中的一些Seq[Int]包含重叠的元素。任务是合并此RDD中所有重叠的Seq[Int],并将结果存储到新的RDD中。
例如,假设 RDD[Seq[Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]],结果应该是 [[1,2,3,4,5], [7,8,9]]。
由于RDD[Seq[Int]]非常大,我无法在驱动程序中执行此操作。是否可以使用distributed groupBy/map/reduce等来完成它?
终于自己解决了。
这个问题可以转换为计算由RDD[Seq[Int]]中的元素形成的所有连接组件,因为合并条件(两个Seq[Int]具有重叠的整数)表示两个Seq[Int]之间的连接。
基本思想是:
- 给RDD[Seq[Int]]中的每个元素一个唯一的键(.zipWithUniqueId) 按
- 生成的键对 Seq[Int] 中的整数进行分组,因此出现在多个 Seq[Int] 中的整数将具有相应的键组合在一起
- 生成 RDD 图,其中边缘是步骤 2 中同一组中的关键对
使用 GraphX 计算连接的组件,并联接结果
val sets = Seq(Seq(1,2,3,4), Seq(4,5), Seq(1,2,3), Seq(6,7,8), Seq(9,10), Seq(7,9)) val rddSets = sc.parallelize(sets) .zipWithUniqueId .map(x => (x._2, x._1)).cache() val edges = rddSets.flatMap(s => s._2.map(i => (i, s._1))) .groupByKey.flatMap(g => { var first = g._2.head for (v <- g._2.drop(1)) yield { val pair = (first, v) first = v pair } }).flatMap(e => Seq((e._1, e._2), (e._2, e._1))) val vertices = Graph.fromEdgeTuples[Long](edges, defaultValue = 0) .connectedComponents.vertices rddSets.join(vertices).map(x => (x._2._2, x._2._1)) .reduceByKey((s1, s2) => s1.union(s2).distinct) .collect().foreach(x => println (x._2.toString()))