Spark 作业返回"Paused for longer than xxx seconds and unable to write pages to client"



我正在做一个spark作业,负责从cassandra数据库读取数据,然后根据条件对数据执行一些操作,比如更新一些记录等。火花适用于某些键空间和制表符,但对于其他一些键空间,它过早失效,类似于的堆叠竞争

WARN  2022-03-30 21:39:50,082 org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, 10.0.144.110, executor 0): com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:26)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.maybeFetchNextResult(DefaultContinuousPagingResult.java:72)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:62)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:50)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at com.company.spark.job.MySparkJobClass.lambda$handleMovingRackspaceImagesToS3$e3b46054$1(MySparkJobClass.java:167)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:184)
at com.datastax.driver.core.ContinuousPagingQueue.onResponse(ContinuousPagingQueue.java:148)
at com.datastax.driver.core.MultiResponseRequestHandler.setResult(MultiResponseRequestHandler.java:888)
at com.datastax.driver.core.MultiResponseRequestHandler.onSet(MultiResponseRequestHandler.java:600)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1253)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1160)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: com.datastax.driver.core.exceptions.ClientWriteException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:124)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:58)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:303)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:274)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
... 29 more

我从这里发现,火花执行器有60个默认的最大失败次数,心跳次数为10次,导致了600次,但现在的问题是,是什么导致这些执行器失败?有什么可以帮助的提示吗?

更新:

执行程序在放弃并退出之前尝试向驱动程序发送检测信号的次数(退出代码为56(
默认值:60
例如,如果最大失败次数为60(默认值(,且spark.executor.heartbeatInterval为10s,则executor将尝试发送长达600秒(10分钟(的心跳。

当服务器端读取花费太长时间时,ClientWriteException会被Java驱动程序抛出。如果不知道表模式和Spark作业正在运行的查询,就很难知道为什么读取需要很长时间。

诚然,堆栈跟踪对我来说很陌生。你没有指定Spark连接器和Java驱动程序的版本,所以我假设你可能使用的是一个非常旧的Java驱动程序版本,现在已经不支持了。

如果你用背景细节更新你的原始问题,比如版本、模式、CQL查询、要复制的最小代码,我很乐意复习并更新我的答案。干杯

相关内容

最新更新