我有一个发送者和一个交换消息的消费者:
public class Sender extends AbstractVerticle {
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}
public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
}));
}
}
我希望当消费者回复消息时,发件人会收到它。但是,我得到一个超时:
Successfully sent reply
Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1
部署:
public class ServiceLauncher {
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
vertx.deployVerticle(new Consumer(), res -> {
if (res.succeeded()) {
System.out.println("Verticle " + Consumer.NAME + " deployed.");
vertx.deployVerticle(new Sender());
System.out.println("Verticle " + Sender.NAME + " deployed.");
} else {
System.out.println("Verticle " + Consumer.NAME + " not deployed.");
}
});
}
我做错了什么?提前感谢
更新:问题出在msg.reply(( - 消费者没有回复消息,但我仍然无法弄清楚原因。
超时不是发生在请求的发送方,而是发生在其接收方。处理程序(在 msg.reply()
中定义(等待发件人的下一次回复。它不是处理程序,仅确认发送状态。Sender
eventBus.send()
中的处理程序也会在发送方收到回复时触发。
只需删除msg.reply()
中的处理程序,并以相同的方式修改Sender
中的处理程序eventBus.send()
:
public class Sender extends AbstractVerticle {
public static final String NAME = "SENDER";
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully received reply: " + res.result().body());
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
}
}
和
public class Consumer extends AbstractVerticle {
public static final String NAME = "CONSUMER";
@Override
public void start() {
final EventBus eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> {
System.out.println("Message received");
msg.reply("Hi from consumer.");
});
}
}
执行后,您将看到:
Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.
我也有类似的问题。就我而言,我发送了一个非 JsonObject 回复。消息必须使用有效的 JsonObject 回复 - 而不是 JsonArray 或任何其他。这看起来是默认行为,尽管文档提到 JsonObject 不是必需的。但是原始代码片段中真正的问题是,您已经为回复的回复指定了一个处理程序。消费者回复成功,但消费者未收到发件人的回复。请参阅下面的评论。
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) { //this is expecting a response from Sender which never does so this will never execute.
System.out.println("Successfully sent reply");
} else { //it's not failing either so this will not execute either
System.out.println("Failed to send reply." + res.cause());
}
}));
}