我有一个关于Spark的非常基本的问题。我通常使用50个内核来运行火花作业。在查看工作进展时,大多数时间都显示了50个并行运行的过程(应该这样做),但有时仅显示并行运行的2或4个火花过程。这样:
[Stage 8:================================> (297 + 2) / 500]
正在处理的RDD是100多个分区的repartitioned
。所以这不是一个问题。
我有一个观察。我已经看到了大多数情况下发生的模式,SparkUI中的数据局部性显示NODE_LOCAL
,而其他时间则所有50个过程正在运行时,某些过程显示RACK_LOCAL
。这使我怀疑,也许会发生这种情况,因为数据在处理相同的节点之前已被缓存,以避免网络开销,这会减慢进一步的处理。
如果是这种情况,避免它的方法是什么。如果不是这样,这里是怎么回事?
在解决这个问题的一周或更长时间之后,我认为我已经找到了导致问题的原因。
如果您在同一问题上苦苦挣扎,那么开始的好点是检查Spark实例是否已配置良好。有一篇很棒的Cloudera博客文章。
但是,如果问题与配置没有(就像我一样),则问题在您的代码中的某个位置。问题在于,有时由于原因(偏斜连接,数据源中的分区等不均匀),您正在使用的RDD会在2-3个分区上获得很多数据,而其余的分区几乎没有数据。
为了减少整个网络上的数据洗牌,Spark试图每个执行程序都在该节点上处理本地驻留的数据。因此,2-3位执行人工作了很长时间,其余执行者只是用几毫秒的数据完成了数据。这就是为什么我经历了上面问题中描述的问题。
调试此问题的方法是首先检查RDD的分区大小。如果与其他分区相比,一个或几个分区非常大,那么下一步将是在大分区中找到记录,以便您知道,尤其是在偏斜的加入的情况下,钥匙变得偏向了。我写了一个小功能来调试以下内容:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'nSample Content of the largest Partition: n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
它给了我最小,最大的分区尺寸,如果这两个之间的差异超过5次,它将打印出最大分区的5个元素,应该使您对正在发生的事情有一个粗略的了解。
一旦您弄清楚问题是偏斜的分区,您就可以找到一种方法来摆脱该偏斜的密钥,或者您可以重新分配您的数据框架,这将迫使它同样分发,并且您将会现在查看所有执行者都将在相等的时间工作,您会看到少于可怕的OOM错误,处理也将很快。
这只是我的两分钱作为火花新手,我希望Spark专家可以在这个问题上增加一些东西,因为我认为Spark World中的许多新手经常面临类似的问题。