为什么Spark中需要折叠操作



我有一个愚蠢的问题,涉及foldPySpark中的reduce。我理解这两种方法之间的区别,但是,如果两者都需要所应用的函数是可交换的monoid,我无法找出fold cannot be substituted by减少的例子。

此外,在fold的PySpark实现中,它使用了acc = op(obj, acc),为什么使用此操作顺序而不是acc = op(acc, obj)?(对我来说,这第二个订单听起来更接近leftFold

干杯

托马斯

空RDD

RDD为空时不能替换:

val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at   
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
rdd.fold(0)(_ + _)
// Int = 0

当然,您可以将reduceisEmpty上的条件相结合,但它相当难看。

可变缓冲液

fold的另一个用例是具有可变缓冲区的聚合。考虑以下RDD:

import breeze.linalg.DenseVector
val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)

假设我们想要所有元素的和。一个简单的解决方案是简单地用+:进行还原

rdd.reduce(_ + _)

不幸的是,它为每个元素创建了一个新的向量。由于对象创建和随后的垃圾收集非常昂贵,因此最好使用可变对象。用reduce是不可能的(RDD的不变性并不意味着元素的不变性),但用fold可以实现如下:

rdd.fold(DenseVector(0))((acc, x) => acc += x)

这里使用零元素作为可变缓冲区,每个分区初始化一次,不影响实际数据。

acc=op(obj,acc),为什么使用此操作顺序而不是acc=op(acc,obj)

参见SPARK-6416和SPARK-7683

最新更新