垂直:消息回复超时



我有一个发送者和一个交换消息的消费者:

public class Sender extends AbstractVerticle {
@Override
public void start() {
    EventBus eventBus = vertx.eventBus();
    eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    });
    eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}
public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;
@Override
public void start() {
    eventBus = vertx.eventBus();
    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}

}

我希望当消费者回复消息时,发件人会收到它。但是,我得到一个超时:

Successfully sent reply
Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1

部署:

public class ServiceLauncher { 
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
    vertx.deployVerticle(new Consumer(), res -> {
        if (res.succeeded()) {
            System.out.println("Verticle " + Consumer.NAME + " deployed.");
            vertx.deployVerticle(new Sender());
            System.out.println("Verticle " + Sender.NAME + " deployed.");
        } else {
            System.out.println("Verticle " + Consumer.NAME + " not deployed.");
        }
    });
}

我做错了什么?提前感谢

更新:问题出在msg.reply(( - 消费者没有回复消息,但我仍然无法弄清楚原因。

超时不是发生在请求的发送方,而是发生在其接收方。处理程序(在 msg.reply() 中定义(等待发件人的下一次回复。它不是处理程序,仅确认发送状态。Sender eventBus.send()中的处理程序也会在发送方收到回复时触发。

只需删除msg.reply()中的处理程序,并以相同的方式修改Sender中的处理程序eventBus.send()

public class Sender extends AbstractVerticle {
    public static final String NAME = "SENDER";
    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
            if (res.succeeded()) {
                System.out.println("Successfully received reply: " + res.result().body());
            } else {
                System.out.println("Failed to send reply." + res.cause());
            }
        });
    }
}

public class Consumer extends AbstractVerticle {
    public static final String NAME = "CONSUMER";
    @Override
    public void start() {
        final EventBus eventBus = vertx.eventBus();
        eventBus.consumer(Constants.ADDRESS, msg -> {
            System.out.println("Message received");
            msg.reply("Hi from consumer.");
        });
    }
}

执行后,您将看到:

Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.

我也有类似的问题。就我而言,我发送了一个非 JsonObject 回复。消息必须使用有效的 JsonObject 回复 - 而不是 JsonArray 或任何其他。这看起来是默认行为,尽管文档提到 JsonObject 不是必需的。但是原始代码片段中真正的问题是,您已经为回复的回复指定了一个处理程序。消费者回复成功,但消费者未收到发件人的回复。请参阅下面的评论。

@Override
public void start() {
    eventBus = vertx.eventBus();
    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) { //this is expecting a response from Sender which never does so this will never execute.
            System.out.println("Successfully sent reply");
        } else { //it's not failing either so this will not execute either
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}

最新更新