如果添加到Zookeeper的大师中的一位降低,则不接受新应用程序的主动主人



我面临一个非常奇怪的问题,同时在Spark独立群集中实现高可用性(HA)。

我已经配置了3个火花大师,并在Zookeeper中注册了以下步骤:

  1. 创建一个具有内容的配置文件ha.conf如下:

spark.deploy.recoverymode = zookeeper

spark.deploy.zookeeper.url = zk_host:2181

spark.deploy.zookeeper.dir =/spark

  1. 通过将此属性文件作为参数传递给Start-Master脚本如下:
  2. 来开始所有3个大师。

./start -master.sh -h localhost -p 17077 -webui -port 18080 - Properties-File ha.conf

这样,我在Zookeeper中开始并注册了所有3个Spark Master。

工作如果我杀死了主动的主人,那么所有的运行应用程序都会被新的Active Master拾取。

不工作如果任何一个火花主(例如:localhost:17077)降低/不起作用,我使用以下命令提交申请:

./bin/spark-submit - class wordcount -master spark://localhost:17077,h2:27077,h3:37077 -deploy-mode cluster-conf spark.cores.cores.cores.cors.cors.max = 1〜/〜/〜/〜/testspark-0.0.1-snapshot.jar/user1/test.txt

理想情况下,应该向活跃的大师使用,应该做得很好,因为只有一个主人正在努力,而我的工作正常,但我会得到例外,例如:

Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
    at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
    at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:230)
    at org.apache.spark.deploy.Client$$anonfun$7.apply(Client.scala:230)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.deploy.Client$.main(Client.scala:230)
    at org.apache.spark.deploy.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to connect to localhost/127.0.0.1:17077
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
        at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
        at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
        at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: localhost/127.0.0.1:17077
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 more

任何帮助/线索/建议将不胜感激。请帮助我理解这一点,我搜索了这样的问题,但找不到任何东西。

update

当我以群集模式提交申请时,我将面临此问题,如果我以客户端模式提交申请,没有问题。

可以将应用程序提交给Spark Rest服务器,该服务器在6066上运行,而不是在7077上运行的Legacy System上。

因此,当使用以下命令将应用程序提交给REST服务器时,该问题已解决:

./bin/spark-submit --class WordCount --master spark://localhost:6066,h2:6066,h3:6066 --deploy-mode cluster --conf spark.cores.max=1 ~/TestSpark-0.0.1-SNAPSHOT.jar /user1/test.txt

现在,如果一个火花主被倒下,则将应用程序提交给另一个Spark Master。

最新更新