找到给定RDD的每个分区大小的最佳方法是什么?我正在尝试调试一个偏斜的分区问题,我已经尝试过:
l = builder.rdd.glom().map(len).collect() # get length of each partition
print('Min Parition Size: ',min(l),'. Max Parition Size: ', max(l),'. Avg Parition Size: ', sum(l)/len(l),'. Total Partitions: ', len(l))
它适用于小型RDD,但是对于较大的RDD,它给出了OOM错误。我的想法是glom()
正在导致这种情况。但是无论如何,只是想知道是否有更好的方法?
使用:
builder.rdd.mapPartitions(lambda it: [sum(1 for _ in it)])
@lostinoverflow的答案效果很好。我找到了使用以下代码找到每个分区的大小和索引的另一种方法。多亏了这个很棒的帖子。
这是代码:
l = test_join.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
然后您可以使用此代码获得最大和最小尺寸分区:
min(l,key=lambda item:item[1])
max(l,key=lambda item:item[1])
找到偏斜分区的钥匙,如果需要,我们可以进一步调试该分区的内容。
如果有人来这里寻找Scala解决方案:
// For DataFrame:
df.mapPartitions(it => Iterator(it.size))
// For RDD:
df.rdd.mapPartitions(it => Iterator(it.size))