我有一个愚蠢的问题,涉及fold
和PySpark
中的reduce。我理解这两种方法之间的区别,但是,如果两者都需要所应用的函数是可交换的monoid,我无法找出fold cannot be substituted by
减少的例子。
此外,在fold
的PySpark实现中,它使用了acc = op(obj, acc)
,为什么使用此操作顺序而不是acc = op(acc, obj)
?(对我来说,这第二个订单听起来更接近leftFold
)
干杯
托马斯
空RDD
RDD
为空时不能替换:
val rdd = sc.emptyRDD[Int]
rdd.reduce(_ + _)
// java.lang.UnsupportedOperationException: empty collection at
// org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
rdd.fold(0)(_ + _)
// Int = 0
当然,您可以将reduce
与isEmpty
上的条件相结合,但它相当难看。
可变缓冲液
fold的另一个用例是具有可变缓冲区的聚合。考虑以下RDD:
import breeze.linalg.DenseVector
val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
假设我们想要所有元素的和。一个简单的解决方案是简单地用+
:进行还原
rdd.reduce(_ + _)
不幸的是,它为每个元素创建了一个新的向量。由于对象创建和随后的垃圾收集非常昂贵,因此最好使用可变对象。用reduce
是不可能的(RDD的不变性并不意味着元素的不变性),但用fold
可以实现如下:
rdd.fold(DenseVector(0))((acc, x) => acc += x)
这里使用零元素作为可变缓冲区,每个分区初始化一次,不影响实际数据。
acc=op(obj,acc),为什么使用此操作顺序而不是acc=op(acc,obj)
参见SPARK-6416和SPARK-7683