我想创建一个服务器套接字,以聆听,在主机上,我会提前知道IP和主机名(并且它在YARN节点列表中显示为HostName)。但是我似乎无法让它在那个主机上聆听,而不会让它失败,但请在随时随地失败。
有一个水槽接收器,它具有我想要的那种特定于主机的功能。
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
我的接收器代码:
class TCPServerReceiver(hostname: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
}
private def receive() {
/* This is where the job fails until it happens to start on the correct host */
val server = new ServerSocket(port, 50, InetAddress.getByName(hostname))
var userInput: String = null
while (true) {
try {
val s = server.accept()
val in = new BufferedReader(new InputStreamReader(s.getInputStream()))
userInput = in.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = in.readLine()
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
}
然后在运行时进行测试:
echo 'this is a test' | nc <hostname> <port>
当我作为本地客户端运行时,这一切都起作用,但是当它提交给纱线群集时,日志显示它试图在不同主机上的其他容器中运行,并且所有这些都失败了,因为主机名与容器:
java.net.BindException: Cannot assign requested address
最终(几分钟后),一旦接收器尝试在正确的主机上启动,它确实会创建套接字大量的"启动时间",我担心添加更多节点会导致它花费更长的时间!
是否有一种方法可以确保该接收器在第一次尝试的正确主机上开始?
自定义TCPServerReceiver
实现也应实现:
def preferredLocation: Option[String]
覆盖此内容以指定首选位置(主机名)。
在这种情况下,类似:
def preferredLocation = Some(hostname)