突出显示
我已经升级了Spark,并尝试在YARN上运行已经存在的Spark Streaming应用程序(通过流接受文件名,然后从HDFS读取这些文件名,使用rdd和数据帧操作进行转换,最后将分析的数据集持久化到HBase(,该应用程序失败,无法解决问题。
环境详细信息如下
使用版本
平台
OS:RHEL 6.6,128GB RAM,42TB HDD,32核
Java:1.8.0_25
Scala:2.11
Hadoop:2.7.7
Spark:2.4.6,带有Hadoop 2.7二进制文件
HBase:1.4.12
升级后不工作
Spark:3.0与Hadoop 2.7二进制文件
根据Spark 3.0.0的要求,使用Scala 2.12编译了相同的代码,根据版本变化进行了一些小的更改,没有逻辑更改。
所需的YARN配置
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>
启动作业时通过了Spark配置
spark.app.name=Ingestion
spark.eventLog.enabled=true
spark.yarn.historyServer.address=${hadoopconf-yarn.resourcemanager.hostname}:18088
spark.eventLog.dir=hdfs:///user/hduser/applicationHistory
spark.submit.deployMode=cluster
spark.driver.memory=1GB
spark.driver.cores=1
spark.executor.memory=5GB
spark.executor.cores=5
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.sql.shuffle.partitions=2001
spark.logging.level=INFO
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.yarn.archive=hdfs:///spark-3.0.0-bin-hadoop2.7-jars.zip
spark.ui.killEnabled=false
spark.driver.memoryOverhead=512
spark.executor.memoryOverhead=1024
spark.yarn.maxAppAttempts=4
spark.yarn.am.attemptFailuresValidityInterval=1h
问题
同一段代码使用相同Hadoop、YARN、spark配置集的spark 2.4.4、2.4.5、2.4.6版本。当我升级到spark 3.0.0时,代码开始失败,出现以下异常。尝试过多次调整,如增加资源、减少分区等,但都没有成功。已通过telnet检查端口7337,它已打开并正在侦听。经过一周的调试,我们找不到任何解决方案,而且似乎没有理由关闭shuffle端口连接
处理几乎不到50MB的数据集。相同的代码能够处理超过300 MB的数据,具有与Spark 2.4.x完全相同的配置。这太奇怪了
异常
org.apache.spark.shuffle.FetchFailedException: Connection from server1/xxx.xxx.x.xxx:7337 closed
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:748)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:663)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:70)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:155)
at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:129)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:107)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$Lambda$597/1323895653.apply(Unknown Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:859)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:859)
at org.apache.spark.rdd.RDD$$Lambda$584/1207730390.apply(Unknown Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.executor.Executor$TaskRunner$$Lambda$421/1364680867.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection from server1/xxx.xxx.x.xxx:7337 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
还有其他人面临这个问题吗?如果是,请告诉我你是如何解决的。我没有更多的线索可以检查。如有任何帮助,我们将不胜感激
谢谢
Spark 3.x无法使用旧的shuffle服务。
如果您想保留旧的shuffle服务,请尝试以下配置更改。
spark.shuffle.useOldFetchProtocol=true
参考https://issues.apache.org/jira/browse/SPARK-29435