如何解决"spark.akka.framesize" "job aborted due to stage failure"?



我有一个 Spark 程序,它正在执行一堆列操作,然后调用.collect()将结果拉入内存。

我在运行代码时收到此问题:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 302987:27 was 139041896 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values.

可以在此处看到更完整的堆栈跟踪:https://pastebin.com/tuP2cPPe

现在我想知道我需要对我的代码和/或配置进行哪些更改才能解决此问题。 我有几个想法:

  • 按照建议增加spark.akka.frameSize。 我有点不愿意这样做,因为我不太了解这个参数,对于其他工作,我可能更喜欢默认值。 有没有办法在应用程序中指定这一点? 是否可以在类似于分区数的代码中动态更改它?

  • 在表上调用collect()之前减少分区数。 我有一种感觉,当分区太多时调用collect()会导致此操作失败。 在将所有这些部分拉入内存时,它给驾驶员带来了太大的压力。

我不明白Consider...using broadcast variables for large values的建议. 这将如何帮助? 无论我是否在每个执行器上都有数据副本,我仍然需要将结果拉回驱动程序。

我还有其他想法吗? 感谢。

我认为这个错误有点误导。 此错误是因为您尝试下载回驱动程序的结果大于 Akka(Spark 使用的底层网络库)可以容纳的消息。广播变量用于有效地将数据发送到工作节点,这与你尝试执行的方向相反。

通常,当收集要拉回大量数据时,您不想执行收集,因为尝试将该结果下载到一个节点时,您将失去作业的任何并行性。 如果您有太多数据,这可能会花费很长时间,或者可能导致您的作业失败。 您可以尝试增加 Akka 帧大小,直到它足够大,以至于您的工作不会失败,但将来随着数据的增长,这可能会再次中断。

更好的解决方案是使用RDD写入API将结果保存到一些分布式文件系统(HDFS,S3)。 然后,您可以使用Spark在后续作业中使用它执行更多的分布式操作以将其读回,或者您可以直接从分布式文件系统下载结果并对其进行任何操作。

最新更新