在自定义条件下合并 Spark RDD 中的元素


如何在

自定义条件下合并Spark RDD中的元素?

假设有一个RDD[Seq[Int]],其中这个RDD中的一些Seq[Int]包含重叠的元素。任务是合并此RDD中所有重叠的Seq[Int],并将结果存储到新的RDD中。

例如,假设 RDD[Seq[Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]

],结果应该是 [[1,2,3,4,5], [7,8,9]]。

由于RDD[Seq[Int]]非常大,我无法在驱动程序中执行此操作。是否可以使用distributed groupBy/map/reduce等来完成它?

终于自己解决了。

这个问题可以转换为计算由RDD[Seq[Int]]中的元素形成的所有连接组件,因为合并条件(两个Seq[Int]具有重叠的整数)表示两个Seq[Int]之间的连接。

基本思想是:

  1. 给RDD[Seq[Int]]中的每个元素一个唯一的键(.zipWithUniqueId)
  2. 生成的键对 Seq[Int] 中的整数进行分组,因此出现在多个 Seq[Int] 中的整数将具有相应的键组合在一起
  3. 生成 RDD 图,其中边缘是步骤 2 中同一组中的关键对
  4. 使用 GraphX 计算连接的组件,并联接结果

    val sets = Seq(Seq(1,2,3,4), Seq(4,5), Seq(1,2,3), Seq(6,7,8), Seq(9,10), Seq(7,9))
    val rddSets = sc.parallelize(sets)
                    .zipWithUniqueId
                    .map(x => (x._2, x._1)).cache()
    val edges = rddSets.flatMap(s => s._2.map(i => (i, s._1)))
                       .groupByKey.flatMap(g => {
                           var first = g._2.head
                           for (v <- g._2.drop(1)) yield {
                             val pair = (first, v)
                             first = v
                             pair
                            }
                       }).flatMap(e => Seq((e._1, e._2), (e._2, e._1)))
    val vertices = Graph.fromEdgeTuples[Long](edges, defaultValue = 0)
                        .connectedComponents.vertices
    rddSets.join(vertices).map(x => (x._2._2, x._2._1))
           .reduceByKey((s1, s2) => s1.union(s2).distinct)
           .collect().foreach(x => println (x._2.toString()))
    

相关内容

  • 没有找到相关文章

最新更新