我正在使用spark version 1.6.3
,yarn version 2.7.1.2.3
附带HDP-2.3.0.0-2557
。因为,火花版本在我使用的HDP版本中太旧了,我更喜欢远程使用另一个火花作为纱线模式。
这是我运行火花壳的方式;
./spark-shell --master yarn-client
一切似乎都很好,sparkContext
初始化了,sqlContext
初始化了。我甚至可以访问我的配置单元表。但在某些情况下,当它试图连接到块管理器时,它会遇到麻烦。
我不是专家,但我认为,当我在纱线模式下运行时,块管理器正在我的纱线集群上运行。这对我来说似乎是第一次网络问题,不想在这里问。但是,在某些情况下,我还无法弄清楚。所以这让我觉得这可能不是网络问题。
这是代码;
def df = sqlContext.sql("select * from city_table")
下面的代码工作正常;
df.limit(10).count()
但是大小超过10个,我不知道,每次运行都会发生变化;
df.count()
这引发了一个例外;
6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
)
我刚刚意识到,当有多个任务要洗牌时,就会发生这种情况。
问题是什么,是性能问题还是我看不到的其他网络问题。那是什么洗牌?如果是网络问题,是我的火花和纱线之间,还是纱线本身的问题?
谢谢。
编辑:
我只是在日志中看到一些东西;
17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13)
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809)
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809)
有时,在另一个块管理器上重试它有效,但是,由于超过了默认允许的最大次数 4,因此大多数时间它永远不会结束。
编辑2:
Yarn 对此真的非常沉默,但我认为这是网络问题,我可以将问题迭代到某个地方;
此火花部署在 HDP 环境之外。当 Spark 向 yarn 提交申请时,yarn 会通知 spark 驱动程序有关块管理器和执行器的信息。执行程序是 HDP 群集中的数据节点,在其专用网络中具有不同的 IP。但是,当涉及到在群集外部通知 Spark 驱动程序时,它会为所有执行程序提供相同且始终单一的 IP。这是因为 HDP 群集中的所有节点都通过路由器发出并具有相同的 IP。假设 IP 是150.150.150.150
,当 Spark 驱动程序需要连接并从该执行者那里询问某些内容时,它会使用此 IP 进行尝试。但这个 IP 实际上是整个集群的外部 IP 地址,而不是单个数据节点 IP。
有没有办法让纱线通知执行者(块管理器)及其私有 IP。因为,他们的私有 IP 也可以从这个 Spark 驱动程序正在使用的机器访问。
当化简器任务(对于ShuffleDependency
)无法获取随机块时会抛出FetchFailedException
异常。这通常意味着执行者(带有洗牌块的BlockManager
)死亡,因此例外:
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
执行程序可以 OOMed(= 抛出内存不足错误)或 YARN 由于内存使用过多而决定终止它。
您应该使用 yarn logs 命令查看 Spark 应用程序的日志,并找出问题的根本原因。
yarn logs -applicationId <application ID> [options]
还可以在 Web UI 的"执行程序"选项卡中查看 Spark 应用程序的执行程序的状态。
Spark 通常通过重新运行受影响的任务来从FetchFailedException
中恢复。使用 Web UI 查看 Spark 应用程序的性能。FetchFailedException
可能是由于临时记忆"打嗝"。
这是火花中的已知错误,仍在 2.1.0 版 https://issues.apache.org/jira/browse/SPARK-5928