我在Spark作业中遇到了困难,大约有一半的时间,Spark作业会选择处理单个节点上的所有数据,然后该节点会耗尽内存并死亡。
问题:我如何确保这种情况在任何时候都不会发生?
该系统在Yarn上使用Spark 1.6.0,从Hadoop 2.6数据存储中提取,所有代码都用Java编写。我在一个有十几个ish节点的集群(Amazon)中动态分配资源。
DAG相对简单:
RDD --> mapToPair
coGroup --> flatMapToPair --> reduceByKey --> save
RDD --> mapToPair /
当它正确运行时,所有任务都会很好地分布在集群中,整个作业大约需要20分钟。我们称之为"良好行为"。然而,有时flatMapToPair阶段实际上在单个执行器中运行。我们称之为"不良行为">
当我为一个"坏行为"作业加载Spark UI并深入到flatMapToPair阶段时,我发现事实上,每个节点上大约有3-4个执行器(与"好行为"情况相同)。然而,除了一个外,所有的执行器都在几分之一秒内完成,剩下的执行器运行了10分钟,然后因超过内存限制而被纱线杀死。
我已经尝试过的东西:
网络。搜索"spark在一个节点上运行"和变体几乎都会导致人们在spark shell中以本地模式运行或类似的配置问题。考虑到我至少在某些时候表现良好,这些类型的配置问题似乎不太可能(我已经检查过我不是意外地处于本地模式,我有大约100个分区,…)
在同一集群上运行的其他Spark作业表现良好。这似乎排除了一些集群范围内的错误配置(见鬼,甚至这个作业有时运行得很好)。
集群利用率似乎并不影响我得到的是好行为还是坏行为。我看到了这两种行为,无论是在集群被大量利用时,还是在集群根本没有其他运行时。
这似乎不是一个纱线问题,因为执行器都在集群中分布良好。当然,我可能错了,但问题似乎真的是执行者之间的工作分配。
数据集中有多个键。我在coGroup和flatMapToPair之间插入了一个countByKey,并打印了结果(对于20个左右人口最多的键)。数据在这些顶级关键字中分布得相当均匀。
我在回复评论时尝试过的东西
在flatMapToPair调用之前重新划分RDD,以强制500个分区。这只会将不良行为转移到重新划分阶段。
增加默认的并行度。我确实通过这种方式获得了更多的分区,但糟糕的行为仍然存在于flatMapToPair阶段。
删除数据(实际上,我在发布之前做了很多这样的工作,但没有将其包含在原始列表中)。我们只有几十GB,我已经加载了我需要的最低限度的数据。
这是一个"有趣"的小黑森伯格,在添加调试日志记录后,不良行为会消失,然后在删除日志记录后保持不动,只是在一段时间后再次出现。我没有主意,所以如果有人有一些推荐的诊断步骤,我会洗耳恭听。
我遇到了一些非常相似的东西,虽然我对这个解决方案并不完全满意,因为我无法完全解释它为什么有效,但它似乎确实有效。在我的情况下,它是在一次洗牌之后,洗牌后的数据大小相当小。问题是,随后的计算大大增加了数据的大小,以至于在1或2个执行器上进行这些计算成为瓶颈。我的最佳猜测是,它与启发式有关,启发式涉及数据源的首选位置和目标分区大小,可能与不知道后期正在进行的扩展相结合。
通过添加coalesce(totalCores)
,我能够获得一致、分布良好的shuffle,其中totalCores
定义为spark.executor.instances
xspark.executor.cores
。它似乎也适用于较大倍数的totalCores
,但在我的情况下,我不需要更多的并行性。注意,根据使用情况,可能需要使用repartition
而不是coalesce
。此外,这是在火花2.2.1上进行的,以供参考。