spark-scala中向量的子集操作



我有一个形式的RDD curRdd

res10: org.apache.spark.rdd.RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)] = ShuffledRDD[102]

其中CCD_ 2产生以下结果。

Array((Vector((5,2)),1), (Vector((1,1)),2), (Vector((1,1), (5,2)),2))

这里:int对的向量和计数

现在,我想通过向下渗透计数将其转换为另一个相同形式RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)]的RDD。

(Vector((1,1), (5,2)),2))将其计数2贡献给作为其子集的任何密钥,如同(Vector((5,2)),1)变为(Vector((5,2)),3)

对于上面的例子,我们的新RDD将具有

(Vector((5,2)),3), (Vector((1,1)),4), (Vector((1,1), (5,2)),2)

我该如何做到这一点?请帮忙。

首先可以介绍Seq:的subsets操作

implicit class SubSetsOps[T](val elems: Seq[T]) extends AnyVal {
    def subsets: Vector[Seq[T]] = elems match {
      case Seq() => Vector(elems)
      case elem +: rest => {
        val recur = rest.subsets
        recur ++ recur.map(elem +: _)
      }
    }
  }

empty子集始终是结果向量中的第一个元素,因此可以使用.tail 省略它

现在,您的任务非常明显map-reduce,就RDD:而言,这就是flatMap-reduceByKey

 val result = curRdd
      .flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
      .reduceByKey(_ + _)

更新

这个实现可能会在结果中引入新的集合,如果你只想选择原始集合中出现的集合,你可以将结果与原始集合连接:

val result = curRdd
  .flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
  .reduceByKey(_ + _)
  .join(curRdd map identity[(Seq[(Int, Int)], Int)])
  .map { case (key, (v, _)) => (key, v) }

注意,需要map identity步骤来将密钥类型从原始RDD中的Vector[_]转换为Seq[_]。您可以修改curRdd.collect()0定义,将Seq[T]的所有出现时间替换为Vector[T],或者按照以下(硬编码scala.collection)方式更改定义:

import scala.collection.SeqLike
import scala.collection.generic.CanBuildFrom
implicit class SubSetsOps[T, F[e] <: SeqLike[e, F[e]]](val elems: F[T]) extends AnyVal {
    def subsets(implicit cbf: CanBuildFrom[F[T], T, F[T]]): Vector[F[T]] = elems match {
      case Seq() => Vector(elems)
      case elem +: rest => {
        val recur = rest.subsets
        recur ++ recur.map(elem +: _)
      }
    }
  }

最新更新