Spark RDD:多次reducebykey或仅一次



我有如下代码:

// make a rd according to an id
def makeRDD(id:Int, data:RDD[(VertexId, Double)]):RDD[(Long, Double)] = { ... }  
val data:RDD[(VertexId, Double)] = ... // loading from hdfs
val idList = (1 to 100)
val rst1 = idList.map(id => makeRDD(id, data)).reduce(_ union _).reduceByKey(_+_)
val rst2 = idList.map(id => makeRDD(id, data)).reduce((l,r) => (l union r).reduceByKey(_+_))

rst1和rst2得到样本结果。我认为rst1需要更多的内存(100次),但只有一次reduceByKey转换;然而,rst2需要更少的内存,但需要更多的reduceByKey转换(99次)。那么,这是一场时间和空间的权衡游戏吗?

我的问题是:我上面的分析是对的,还是Spark内部以同样的方式对待翻译动作?

p.S.:rst1并集所有子rdd,然后reduceByKey,reduceByKeyreduce之外。rst2 reduceByKey一个接一个,reduceByKey在reduce内部

长话短说,两种解决方案的效率相对较低,但第二种方案比第一种方案更差。

让我们从回答最后一个问题开始。对于低级RDD API,只有两种类型的全局自动优化(相反):

  • 使用显式或隐式缓存的任务结果,而不是重新计算完整沿袭
  • 将不需要混洗的多个变换组合为单个ShuffleMapStage

其他一切都是定义DAG的顺序转换。这与更具限制性的高级Dataset(DataFrame)API形成鲜明对比,后者对转换进行特定假设,并对执行计划进行全局优化。

关于您的代码。第一个解决方案的最大问题是,当您应用迭代union时,谱系会不断增长。它使一些事情变得昂贵,比如故障恢复,并且由于RDD是递归定义的,因此可能会因StackOverflow异常而失败。一个不太严重的副作用是分区数量的增加,这似乎在随后的减少中没有得到补偿*。你会在我对由于长RDD系列而导致的堆栈溢出的回答中找到更详细的解释,但你真正需要的是一个像这样的union

sc.union(idList.map(id => makeRDD(id, data))).reduceByKey(_+_)

这实际上是一个最佳的解决方案,假设你应用了真正的减少函数。

第二种解决方案显然也存在同样的问题,但情况越来越糟。虽然第一种方法只需要两个阶段进行一次混洗,但这需要对每个RDD进行混洗。由于分区的数量在增长,并且您使用默认的HashPartitioner,所以每条数据都必须多次写入磁盘,并且很可能在网络上多次搅乱。忽略低级别计算,每条记录被打乱O(N)次,其中N是合并的RDD的数量。

关于内存使用,如果不了解更多关于数据分布的信息,这一点并不明显,但在最坏的情况下,第二种方法可能会表现出更糟糕的行为。

如果+在恒定空间下工作,则对归约的唯一要求是使用哈希映射来存储映射侧组合的结果。由于分区是作为数据流处理的,而没有将完整的内容读取到内存中,这意味着每个任务的总内存大小将与唯一键的数量成比例,而不是与数据量成比例。由于第二种方法需要更多的任务,因此总体内存使用率将高于第一种情况。平均而言,由于数据是部分组织的,它可能会稍微好一点,但不太可能补偿额外的成本。


*如果你想了解它如何影响整体性能,你可以看到Spark迭代时间在使用join时呈指数级增长。这是一个略有不同的问题,但应该让你知道为什么控制分区数量很重要。

最新更新