RDD中有任何操作吗?维持秩序



我想要RDD性能中的一个操作,就像reduce一样,但不需要运算符是可交换的。即我希望后面的CCD_ 3将始终是CCD_。

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val result = rdd.someAction{ _+_ }

首先,我找到了foldRDD#fold的医生说:

def-fold(零值:T)(运算:(T,T)⇒T) :T聚合每个分区,然后使用给定关联函数和中性"零值"

请注意,文档中不需要可交换。然而,结果并不像预期的那样:

scala> rdd.fold(""){ _+_ }
res10: String = 312456879

编辑我已经尝试过@dk14提到的,但没有运气:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res22: String = 341276895
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res23: String = 914856273
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res24: String = 742539618
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res25: String = 271468359

Scala中没有满足这一标准的内置归约操作,但您可以通过结合mapPartitionscollect和局部归约来轻松实现自己的归约:

import scala.reflect.ClassTag
def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = {
  rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f)
}

使用collectreduce的组合进行合并,而不是fold使用的异步无序方法,确保了全局顺序的保留。

这当然会带来一些额外的成本,包括:

  • 驱动器上的内存占用稍高
  • 明显更高的延迟-我们明确地等待所有任务完成,然后再开始本地减少

@YuvalItzchakov指出,fold在组合结果时不会保留分区RDD中的排序。为了说明这一点,考虑将原始RDD合并到一个唯一的分区

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1)
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res4: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res5: String = 123456789
scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ }
res6: String = 123456789

最新更新