在Databricks上运行作业时创建表失败



我一直在SQL中对两个表进行左连接(表a包含数十亿行,表B包含数百万行),并从连接的结果创建一个新表(表C)。我在Databricks上使用一个XXL SQL仓库,有两个工人来处理这个操作的计算。

主要的问题是当我安排这个工作要执行时,它似乎每次都失败。当我手动运行它时,作业几乎每次都完成。

DROP TABLE IF EXISTS tableC;
CREATE TABLE tableC
AS (
SELECT * 
FROM tableA
LEFT JOIN tableB
ON tableA.id = tableB.id
)

在将SQL查询作为计划作业运行后,我似乎得到了下面的错误。根据我的理解,错误发生在工作的最后阶段,当它将行写入delta表时,但是我似乎无法找出此失败的原因。我已经检查了,以确保数据没有损坏,我有足够的内存。令我困惑的主要部分是,当SQL命令手动运行时,它可以工作,但当查询是计划作业运行的一部分时,它似乎总是失败。

Job aborted due to stage failure: ResultStage 9 (write at WriteIntoDeltaCommand.scala:70) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.FetchFailedException
at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:315)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.$anonfun$next$1(ShuffleBlockFetcherIterator.scala:905)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:736)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.RowToColumnarExec$$anon$1.hasNext(Columnar.scala:491)
at com.databricks.photon.CloseableIterator$$anon$5.hasNext(CloseableIterator.scala:62)
at com.databricks.photon.NativeColumnBatchIterator.hasNext(NativeColumnBatchIterator.java:47)
at 0x8235362 <photon>.HasNext(external/workspace_spark_3_3/photon/jni-wrappers/jni-native-column-batch-iterator.cc:62)
at 0x4707a79 <photon>.OpenImpl(external/workspace_spark_3_3/photon/exec-nodes/file-writer-node.cc:161)
at com.databricks.photon.JniApiImpl.open(Native Method)
at com.databricks.photon.JniApi.open(JniApi.scala)
at com.databricks.photon.JniExecNode.open(JniExecNode.java:64)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$5(PhotonWriteStageExec.scala:87)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.photon.PhotonExec.timeit(PhotonExec.scala:300)
at com.databricks.photon.PhotonExec.timeit$(PhotonExec.scala:298)
at com.databricks.photon.PhotonWriteStageExec.timeit(PhotonWriteStageExec.scala:38)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$4(PhotonWriteStageExec.scala:87)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1730)
at com.databricks.photon.PhotonWriteStageExec.$anonfun$executeWrite$2(PhotonWriteStageExec.scala:84)
at com.databricks.photon.PhotonExec.$anonfun$executePhoton$2(PhotonExec.scala:484)
at com.databricks.photon.PhotonExec.$anonfun$executePhoton$2$adapted(PhotonExec.scala:353)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:882)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:882)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:62)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:223)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more

基于stacktrace,这个问题似乎有两个原因

org.apache.spark.shuffle.FetchFailedException    (operation failed) 
Caused by: java.nio.channels.ClosedChannelException   ( root cause )

问题可能是默认重试配置/或/超时配置不适合复杂的工作负载。查看本文,并尝试主要在"网络超时"部分的建议:https://towardsdatascience.com/fetch-failed-exception-in-apache-spark-decrypting-the-most-common-causes-b8dff21075c