Vertx Rx Java:取消订阅eventBus订阅者的原因



我将vertx与rx-java一起使用。

我有一个垂直订阅eventBus上的事件的特定地址:

eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
...
message.reply(...);
})
... same for other addresses...

其他垂直市场使用发送事件

eventBus.rxSend(some_address, message, new DeployOptions().setSendTimeout(60000));

顶点是通过RxHelper.deployVerticle创建的。

一切都很好,但过了一段时间,其中一个地址被取消订阅,所有对此事件的请求现在都失败了,并出现ReplyException: No handlers for address some_ddress错误,所有其他地址仍然被订阅。

我在日志中没有看到任何vertx错误。

是什么原因导致消费者从其正在收听的特定地址自动取消订阅?

据我所知:如果请求因错误或超时而失败,就不应该导致取消订阅,所以我真的不明白是什么导致了这种行为。(我根本没有任何明确的unsubscribe调用(

问题似乎是message.reply之前的代码有时会抛出异常:

eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
... <-- exception here
message.reply(...);
})

简单修复:

eventBus.localConsumer(some_addres)
.toObservable()
.subscribe(message -> {
try {
... <-- exception here
message.reply(...);
} catch (Exception e) {
...handle exception...
message.error(...);
}
})

最新更新