火花:找到RDD的每个分区大小



找到给定RDD的每个分区大小的最佳方法是什么?我正在尝试调试一个偏斜的分区问题,我已经尝试过:

l = builder.rdd.glom().map(len).collect()  # get length of each partition
print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ', sum(l)/len(l),'. Total Partitions: ', len(l))

它适用于小型RDD,但是对于较大的RDD,它给出了OOM错误。我的想法是glom()正在导致这种情况。但是无论如何,只是想知道是否有更好的方法?

使用:

builder.rdd.mapPartitions(lambda it: [sum(1 for _ in it)])

@lostinoverflow的答案效果很好。我找到了使用以下代码找到每个分区的大小和索引的另一种方法。多亏了这个很棒的帖子。

这是代码:

l = test_join.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()

然后您可以使用此代码获得最大和最小尺寸分区:

min(l,key=lambda item:item[1])
max(l,key=lambda item:item[1])

找到偏斜分区的钥匙,如果需要,我们可以进一步调试该分区的内容。

如果有人来这里寻找Scala解决方案:

// For DataFrame:
df.mapPartitions(it => Iterator(it.size))
// For RDD:
df.rdd.mapPartitions(it => Iterator(it.size))

相关内容

  • 没有找到相关文章

最新更新