垂直中的以下代码在事件总线上输入消息
io.vertx.reactivex.core.Vertx rxVertx = io.vertx.reactivex.core.Vertx.newInstance(vertx);
Single<Message<Integer>> reply = rxVertx.eventBus().<Integer>rxSend("address", "param");
reply.subscribe(r -> {
// Do something with value
});
另一个垂直的消息消耗了消息:
vertx.eventBus().<Integer>consumer("address", h -> {
Integer integer = ... // call to getValue() method
h.reply(integer);
});
返回的值来自使用SQLClient的MySQL数据库。当前,检索该值的代码在以下方法中:
private Single<Integer> getValue() {
return Single.create(source -> {
mySQLClient.getConnection(res -> {
if (res.succeeded()) {
SQLConnection connection = res.result();
connection.query("SELECT count(*) from myTable", result -> {
if (result.succeeded()) {
Integer i = result.result().getRows().get(0));
source.onSuccess(i);
}
});
} else {
source.onError(res.cause());
}
});
});
}
从消费者内部调用getValue()
方法的正确方法是什么?
以下内容:
vertx.eventBus().<Integer>consumer("address", h -> {
Single<integer> single = getValue();
h.reply(single.subscribe(
s -> System.out.println(s));
});
在h.reply
返回之前打印出值,但是如何从single.subscribe()
返回值,以使其成为h.reply()
的参数?
谢谢
您需要订阅结果并通过结果以回复。您已经倒了语法。
getvalue().subscribe(result -> {
h.reply(result);
}, ex -> {
//Handle error
});