我使用Vert实现websockets。x 3 .
场景很简单:从客户端打开套接字,在顶点顶点工作器上做一些"阻塞"工作,当完成对客户端的响应时(通过打开的套接字)
请告诉我我做得对不对:
VertxWebsocketServerVerticle创建。一旦websocket打开并请求来自客户端,我就使用eventBus并将消息传递给
EventBusReceiverVerticle。这里我正在做阻塞操作。
我实际上是如何将响应发送回VertxWebsocketServerVerticle并将其发送回客户端?
代码:主类:
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new EventBusReceiverVerticle("R1"),new DeploymentOptions().setWorker(true));
vertx.deployVerticle(new VertxWebsocketServerVerticle());
}
VertxWebsocketServerVerticle:
public class VertxWebsocketServerVerticle extends AbstractVerticle {
public void start() {
vertx.createHttpServer().websocketHandler(webSocketHandler -> {
System.out.println("Connected!");
Buffer buff = Buffer.buffer().appendInt(12).appendString("foo");
webSocketHandler.writeFinalBinaryFrame(buff);
webSocketHandler.handler(buffer -> {
String inputString = buffer.getString(0, buffer.length());
System.out.println("inputString=" + inputString);
vertx.executeBlocking(future -> {
vertx.eventBus().send("anAddress", inputString, event -> System.out.printf("got back from reply"));
future.complete();
}, res -> {
if (res.succeeded()) {
webSocketHandler.writeFinalTextFrame("output=" + inputString + "_result");
}
});
});
}).listen(8080);
}
@Override
public void stop() throws Exception {
super.stop();
}
}
EventBusReceiverVerticle:
public class EventBusReceiverVerticle extends AbstractVerticle {
private String name = null;
public EventBusReceiverVerticle(String name) {
this.name = name;
}
public void start(Future<Void> startFuture) {
vertx.eventBus().consumer("anAddress", message -> {
System.out.println(this.name +
" received message: " +
message.body());
try {
//doing some looong work..
Thread.sleep(10000);
System.out.printf("finished waitingn");
startFuture.complete();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
我总是得到:
WARNING: Message reply handler timed out as no reply was received - it will be removed
github项目:https://github.com/IdanFridman/VertxAndWebSockets谢谢你!射线。
由于您正在阻塞websocket处理程序,直到它接收到发送到EventBus的消息的回复,实际上,直到设置延迟10s圈才会收到,您肯定会收到警告,因为事件总线的回复处理程序将超时->消息发送但在超时延迟之前没有收到响应。
实际上我不知道你是否只是在试验Vert。x工具包或者您正在尝试满足某些需求,但是您必须调整您的代码以匹配版本。x 精神:
- 首先,你最好不要阻塞,直到你的websocket处理程序收到消息,记住,当涉及到Vert.x时,一切都是异步的。
- 为了睡眠一段时间,使用Vert。x方式而不是
Thread.sleep(delay)
,即vertx.setTimer(...)
。