如何将值从RXJAVA单人返回到VERTX事件总线消费者



垂直中的以下代码在事件总线上输入消息

io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
   // Do something with value
});

另一个垂直的消息消耗了消息:

vertx.eventBus().<Integer>consumer("address", h -> {
    Integer integer = ... // call to getValue() method
    h.reply(integer);
});

返回的值来自使用SQLClient的MySQL数据库。当前,检索该值的代码在以下方法中:

private Single<Integer> getValue() {
    return Single.create(source -> {
        mySQLClient.getConnection(res -> {
            if (res.succeeded()) {
                SQLConnection connection = res.result();
                connection.query("SELECT count(*) from myTable", result -> {
                    if (result.succeeded()) {
                        Integer i = result.result().getRows().get(0));
                        source.onSuccess(i);
                    }
                });
            } else {
                source.onError(res.cause());
            }
        });
    });
}

从消费者内部调用getValue()方法的正确方法是什么?

以下内容:

vertx.eventBus().<Integer>consumer("address", h -> {
    Single<integer> single = getValue();
    h.reply(single.subscribe(
       s -> System.out.println(s));
});

h.reply返回之前打印出值,但是如何从single.subscribe()返回值,以使其成为h.reply()的参数?

谢谢

您需要订阅结果并通过结果以回复。您已经倒了语法。

getvalue().subscribe(result -> {
    h.reply(result);
}, ex -> {
    //Handle error
});

最新更新