Observable.map 抛出的错误未在订阅中捕获



这是代码:

public void initiateProcess() {
findAbandonedOpenOrders() //returns Observable<AsyncN1QueryResult>
.flatMap(results -> results.rows())
.map(row -> row.value())
.map(s -> processStringToGetOrderId(s.toString()))
.map(
o -> {
log.info("Generating access token for orderId: {}", o);
return identityConnector
.getServiceTokenFromIdentity()
.map(
issueToken ->
RequestInputModel.builder()
.authorisationToken(issueToken.getAccessToken())
.orderId(o)
.build())
.map(
requestInputModel -> {
log.info(
"Invoking cancel order for orderId: {}",
requestInputModel.getOrderId());
return cancelOrderApiConnector
.invokeAPI(
requestInputModel,
RequestInputModel.RequestBodyModel.builder().build())  //throw RuntimeException as soon as the flow enters this method
.subscribe();
})
.subscribe();
})
.subscribe(
s -> {},
e -> {
log.error(ExceptionUtils.getStackTrace(e));
});
}

一旦我调用cancelOrderApiConnector.invokeAPI,它就会抛出运行时异常。因此,直接地图应该抛出它,而该地图又应该由最外层的地图抛出。但这并没有发生。堆栈跟踪不会打印(逻辑在订阅中实现(。

有人可以告诉我我可能做错了什么吗?

不要破坏反应链,将这些map(v -> codeThatProduceAFlux.subscribe())替换为flatMap(v -> codeThatProduceAFlux)错误将在子步骤之间正确传播

最新更新