我有一个形式的RDD curRdd
res10: org.apache.spark.rdd.RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)] = ShuffledRDD[102]
其中CCD_ 2产生以下结果。
Array((Vector((5,2)),1), (Vector((1,1)),2), (Vector((1,1), (5,2)),2))
这里键:int对的向量和值计数
现在,我想通过向下渗透计数将其转换为另一个相同形式RDD[(scala.collection.immutable.Vector[(Int, Int)], Int)]
的RDD。
即(Vector((1,1), (5,2)),2))
将其计数2贡献给作为其子集的任何密钥,如同(Vector((5,2)),1)
变为(Vector((5,2)),3)
。
对于上面的例子,我们的新RDD将具有
(Vector((5,2)),3), (Vector((1,1)),4), (Vector((1,1), (5,2)),2)
我该如何做到这一点?请帮忙。
首先可以介绍Seq
:的subsets
操作
implicit class SubSetsOps[T](val elems: Seq[T]) extends AnyVal {
def subsets: Vector[Seq[T]] = elems match {
case Seq() => Vector(elems)
case elem +: rest => {
val recur = rest.subsets
recur ++ recur.map(elem +: _)
}
}
}
empty
子集始终是结果向量中的第一个元素,因此可以使用.tail
省略它
现在,您的任务非常明显map
-reduce
,就RDD
:而言,这就是flatMap
-reduceByKey
val result = curRdd
.flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
.reduceByKey(_ + _)
更新
这个实现可能会在结果中引入新的集合,如果你只想选择原始集合中出现的集合,你可以将结果与原始集合连接:
val result = curRdd
.flatMap { case (keys, count) => keys.subsets.tail.map(_ -> count) }
.reduceByKey(_ + _)
.join(curRdd map identity[(Seq[(Int, Int)], Int)])
.map { case (key, (v, _)) => (key, v) }
注意,需要map identity
步骤来将密钥类型从原始RDD
中的Vector[_]
转换为Seq[_]
。您可以修改curRdd.collect()
0定义,将Seq[T]
的所有出现时间替换为Vector[T]
,或者按照以下(硬编码scala.collection
)方式更改定义:
import scala.collection.SeqLike
import scala.collection.generic.CanBuildFrom
implicit class SubSetsOps[T, F[e] <: SeqLike[e, F[e]]](val elems: F[T]) extends AnyVal {
def subsets(implicit cbf: CanBuildFrom[F[T], T, F[T]]): Vector[F[T]] = elems match {
case Seq() => Vector(elems)
case elem +: rest => {
val recur = rest.subsets
recur ++ recur.map(elem +: _)
}
}
}