Spark 核心和任务并发



我有一个关于Spark的非常基本的问题。我通常使用50个内核来运行火花作业。在查看工作进展时,大多数时间都显示了50个并行运行的过程(应该这样做),但有时仅显示并行运行的2或4个火花过程。这样:

[Stage 8:================================>                      (297 + 2) / 500]

正在处理的RDD是100多个分区的repartitioned。所以这不是一个问题。

我有一个观察。我已经看到了大多数情况下发生的模式,SparkUI中的数据局部性显示NODE_LOCAL,而其他时间则所有50个过程正在运行时,某些过程显示RACK_LOCAL。这使我怀疑,也许会发生这种情况,因为数据在处理相同的节点之前已被缓存,以避免网络开销,这会减慢进一步的处理。

如果是这种情况,避免它的方法是什么。如果不是这样,这里是怎么回事?

在解决这个问题的一周或更长时间之后,我认为我已经找到了导致问题的原因。

如果您在同一问题上苦苦挣扎,那么开始的好点是检查Spark实例是否已配置良好。有一篇很棒的Cloudera博客文章。

但是,如果问题与配置没有(就像我一样),则问题在您的代码中的某个位置。问题在于,有时由于原因(偏斜连接,数据源中的分区等不均匀),您正在使用的RDD会在2-3个分区上获得很多数据,而其余的分区几乎没有数据。

为了减少整个网络上的数据洗牌,Spark试图每个执行程序都在该节点上处理本地驻留的数据。因此,2-3位执行人工作了很长时间,其余执行者只是用几毫秒的数据完成了数据。这就是为什么我经历了上面问题中描述的问题。

调试此问题的方法是首先检查RDD的分区大小。如果与其他分区相比,一个或几个分区非常大,那么下一步将是在大分区中找到记录,以便您知道,尤其是在偏斜的加入的情况下,钥匙变得偏向了。我写了一个小功能来调试以下内容:

from itertools import islice
def check_skewness(df):
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
    max_part = max(l,key=lambda item:item[1])
    min_part = min(l,key=lambda item:item[1])
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
        print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'nSample Content of the largest Partition: n'
        print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)     if i == max_part[0] else []).take(5))
    else:
        print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

它给了我最小,最大的分区尺寸,如果这两个之间的差异超过5次,它将打印出最大分区的5个元素,应该使您对正在发生的事情有一个粗略的了解。

一旦您弄清楚问题是偏斜的分区,您就可以找到一种方法来摆脱该偏斜的密钥,或者您可以重新分配您的数据框架,这将迫使它同样分发,并且您将会现在查看所有执行者都将在相等的时间工作,您会看到少于可怕的OOM错误,处理也将很快。

这只是我的两分钱作为火花新手,我希望Spark专家可以在这个问题上增加一些东西,因为我认为Spark World中的许多新手经常面临类似的问题。

最新更新