为什么我的Spark工作失败FetchFailedException: NullPointerException



我有一个spark(2.0) sql查询,它连接了几个大表(parquet),其中一个阶段失败了FetchFailedException,我在阶段页面(在spark UI)中查看了失败的任务,发现这一切都是从一个容器失败开始的堆栈跟踪:

6/10/13 11:44:41 ERROR server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8886845294879492052
java.lang.NullPointerException
        at java.io.File.<init>(File.java:363)
        at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:77)
        at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:80)
        at org.apache.spark.shuffle.IndexShuffleBlockResolver.getDataFile(IndexShuffleBlockResolver.scala:54)
        at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:199)
        at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:278)
        at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
        at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:60)
        at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:158)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:106)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

        at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
        at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
        at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:730)
        at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:605)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:162)
        at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:100)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

据我所知,在此之后,在请求shuffle结果的任务中,另一边的fetch失败。

由于您正在使用spark2.0并连接几个大表(拼花)。您的问题可能是工作节点中为每个应用程序提供的用户配额已用完,因此无法写入shuffles文件(仍然错误将是FileNotFound)

检查执行器日志中ParquetReader写入的WARN日志。还要检查为应用程序创建的日志的大小,如果它很大,那么它就占用了应用程序的磁盘空间配额。

parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251)parquet-mr (build ) org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr (build ) using format: (.+) version ((.*) )?(build ?(.*))

如果在读取每个拼花记录时写入上述警告,我猜它将根据拼花表的大小增长。

如果是这种情况,请抑制ERROROFF的日志记录

log4j.rootCategory=ERROR, console
# switch off main offenders
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
log4j.logger.org.apache.hadoop.ipc.Client$Connection$1=ERROR
log4j.logger.org.apache.hadoop.io.retry.RetryInvocationHandler=ERROR
# Parquet related logging
log4j.logger.org.apache.parquet=ERROR
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
#log4j.logger.parquet.CorruptStatistics=ERROR
log4j.logger.parquet=ERROR
# setup console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} logLevel=%p thread=%t class=%C line_number=%L %m%n
log4j.logger.org=OFF

在新的spark版本中修复了这个问题https://github.com/apache/spark/pull/15538

相关内容

  • 没有找到相关文章

最新更新