提前停止Spark中的reduce操作



有没有办法"破坏"Spark中的reduce操作?

假设我想这样做:

var result: Int = 0
while (iterator.hasNext()) {
if (/*Some condition on result*/) {
result = someFunction(result, iterator.next())
} else {
break
}
}

在Spark中执行此操作的方法是reduce一个RDD(包含迭代器返回的内容(:

rdd.reduce((result, next) =>
if (/*Some condition on result*/) someFunction(result, next)
else result
)

但是,如果计算条件很昂贵呢?我可以这样做:

//result is now of type (Int, Boolean) and the initial Boolean is true
rdd.reduce((result, next) =>
if (result._2 && /*Some condition on result*/) (someFunction(result._1, next._1), true)
else (result._1, false)
)

有更干净的方法吗?

不幸的是,您想要做的并不一定适合spark的计算模型。因为数据是跨分区分割的,所以不一定有定义良好的顺序。如果您没有使用spark,那么合适的函数调用应该是takeWhile

val data: List[DType] = ...
...
data.takeWhile(condition).map(someFunction)

你几乎可以通过mapPartitions来实现这一点。它将一个函数应用于每个分区上的迭代器。

val data: RDD[DType] = ...
...
data.mapPartitions(partitionData: Iterator[DType] => partitionData.takeWhile(condition).map(someFunction))

最新更新