这是代码:
public void initiateProcess() {
findAbandonedOpenOrders() //returns Observable<AsyncN1QueryResult>
.flatMap(results -> results.rows())
.map(row -> row.value())
.map(s -> processStringToGetOrderId(s.toString()))
.map(
o -> {
log.info("Generating access token for orderId: {}", o);
return identityConnector
.getServiceTokenFromIdentity()
.map(
issueToken ->
RequestInputModel.builder()
.authorisationToken(issueToken.getAccessToken())
.orderId(o)
.build())
.map(
requestInputModel -> {
log.info(
"Invoking cancel order for orderId: {}",
requestInputModel.getOrderId());
return cancelOrderApiConnector
.invokeAPI(
requestInputModel,
RequestInputModel.RequestBodyModel.builder().build()) //throw RuntimeException as soon as the flow enters this method
.subscribe();
})
.subscribe();
})
.subscribe(
s -> {},
e -> {
log.error(ExceptionUtils.getStackTrace(e));
});
}
一旦我调用cancelOrderApiConnector.invokeAPI
,它就会抛出运行时异常。因此,直接地图应该抛出它,而该地图又应该由最外层的地图抛出。但这并没有发生。堆栈跟踪不会打印(逻辑在订阅中实现(。
有人可以告诉我我可能做错了什么吗?
不要破坏反应链,将这些map(v -> codeThatProduceAFlux.subscribe())
替换为flatMap(v -> codeThatProduceAFlux)
错误将在子步骤之间正确传播