16个任务(1048.5 MB)的序列化结果的总大小大于Spark.Driver.maxresultsize(1024.



我在我的 spark-submit命令中添加 --conf spark.driver.maxResultSize=2050时会出现以下错误。

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

因此,我将maxResultSize提高到2.5 GB,但是Spark作业无论如何都会失败(上面显示的错误)。如何解决此问题?

似乎问题是您试图将其拉回驱动程序的数据量太大。您很可能正在使用 collect 方法从 dataframe/rdd 中检索所有值。驱动程序是一个过程,通过收集 dataframe 您正在将所有分布在集群中分布回到一个节点的数据。这打败了分发它的目的!只有在将数据降低到可管理的金额之后,这样做才有意义。

您有两个选择:

  1. 如果您确实需要使用所有这些数据,则应将其保留在执行者上。使用 HDFS parquet 以分布式方式保存数据,并使用Spark方法与群集上的数据一起工作,而不是试图将其全部收集回到一个地方。

  2. 如果您确实需要将数据还给驱动程序,则应检查是否真的需要所有数据。如果您只需要摘要统计信息,请在拨打收集之前在执行者上计算出来。或者,如果您只需要前100个结果,则只需要收集前100名。

更新:

还有另一个原因您可以遇到此错误,这一点不太明显。当您明确调用收集时,Spark将尝试将数据发送回驱动程序。如果您使用累加器,广播加入数据以及有关每个任务的一些小型状态数据,它还将为每个任务发送累加器结果。如果您有很多分区(根据我的经验20K ),有时可以看到此错误。这是一个已知的问题,可以进行一些改进,而在此过程中进行了更多改进。

如果您的问题是:

是否可以超越的选项:
  1. 增加 spark.driver.maxResultSize或将其设置为无限
  2. 的0
  3. 如果广播连接是罪魁祸首,则可以减少spark.sql.autoBroadcastJoinThreshold来限制广播的大小加入数据
  4. 减少分区的数量

原因:由RDD的Collect()等动作引起

解决方案:由SparkConf设置:conf.set("spark.driver.maxResultSize", "4g")或者由spark-defaults.conf设置: spark.driver.maxResultSize 4g或者呼叫Spark-Submit时设置:--conf spark.driver.maxResultSize=4g

相关内容

  • 没有找到相关文章