我有两个数据范围,df1
,有2200万条记录,df2
和200万记录。我在email_address
上做正确的加入作为密钥。
test_join = df2.join(df1, "email_address", how = 'right').cache()
两个数据帧中的复制(如果有)很少。加入后,我尝试使用此代码来查找所得数据帧test_join
的分区大小:
l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))
结果表明,最大的分区大约是大于平均分区大小的100倍。这个分区大小的偏斜在加入后转换和动作中给出了性能问题。
我知道我可以在使用repartion(num_partitions)
命令加入后同样重新分配它,但是我的问题是为什么我会遇到这个不均匀的分区结果,并且有任何方法首先避免它。
p.s:仅检查问题是否仅带有email_address哈希功能,我还检查了其他几个加入的分区尺寸,我还在数字密钥中看到了这个问题。
@user6910411。问题在于我的数据,随后有一些愚蠢的惯例输入空电子邮件,这导致了这个偏斜的关键问题。
检查最大分区中的Enteries后,我知道那里发生了什么。我发现这种调试技术非常有用,我敢肯定,这可以帮助其他面临同一问题的人。
btw,这是我写的功能,以找到RDD分区的偏差:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'nSample Content of the largest Partition: n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
,然后我只通过了我想检查偏斜的数据框架:
check_skewness(test_join)
,它为我提供了有关其偏度的好信息。