提交火花流接收机时,如何在没有"failing through"的情况下指定主机?



我想创建一个服务器套接字,以聆听,在主机上,我会提前知道IP和主机名(并且它在YARN节点列表中显示为HostName)。但是我似乎无法让它在那个主机上聆听,而不会让它失败,但请在随时随地失败。

有一个水槽接收器,它具有我想要的那种特定于主机的功能。

FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

我的接收器代码:

class TCPServerReceiver(hostname: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }
  def onStop() {
  }
    private def receive() {
        /*  This is where the job fails until it happens to start on the correct host */
        val server = new ServerSocket(port, 50, InetAddress.getByName(hostname))
        var userInput: String = null
        while (true) {
            try {
                val s = server.accept()
                val in = new BufferedReader(new InputStreamReader(s.getInputStream()))
                userInput  = in.readLine()
                while (!isStopped && userInput != null) {
                    store(userInput)
                    userInput = in.readLine()
                }
            } catch {
                case e: java.net.ConnectException =>
                    restart("Error connecting to " + port, e)
                case t: Throwable =>
                    restart("Error receiving data", t)
            }
        }
    }
}

然后在运行时进行测试:

echo 'this is a test' | nc <hostname> <port>

当我作为本地客户端运行时,这一切都起作用,但是当它提交给纱线群集时,日志显示它试图在不同主机上的其他容器中运行,并且所有这些都失败了,因为主机名与容器:

java.net.BindException: Cannot assign requested address

最终(几分钟后),一旦接收器尝试在正确的主机上启动,它确实会创建套接字大量的"启动时间",我担心添加更多节点会导致它花费更长的时间!

是否有一种方法可以确保该接收器在第一次尝试的正确主机上开始?

自定义TCPServerReceiver实现也应实现:

def preferredLocation: Option[String]

覆盖此内容以指定首选位置(主机名)。

在这种情况下,类似:

def preferredLocation = Some(hostname)

最新更新