Spring数据库,无法使用响应式编程按id删除文档



最近我们决定在我们的项目中使用带有couchbase的spring-webflux,我们需要帮助解决反应式编程中的以下用例

  1. 验证请求并将其保存在Bucket1 couchbase中(我们使用了javax.validation和springReactiveCouchbaseRepository)
  2. 调用外部服务(我们使用Web客户端调用API.

    • 成功后,

      • 将AUDIT文档写入Bucket2
      • 获取插入Bucket1中的文档并发送相同的响应
      • 将审核文档写入Bucket2
    • 一旦发生故障,

      • 将AUDIT文档写入Bucket2
      • 删除插入BUCKET1中的文档并引发异常
      • 将审核文档写入Bucket2

我们已经编写了一个服务类,并使用两个存储库类将文档保存到couchbase,以及一个Web客户端来调用外部服务。

我们的服务类方法业务逻辑如下所示。

{
//1. Validate the request and throw the error
List<String> validationMessages = handler.validate(customerRequest);
if (validationMessages != null && !validationMessages.isEmpty()) {
return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
}
//generate the id, set it to the request and save it to BUCKET1
String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
customerRequest.setcustomerRequestId(customerRequestId);
customerRequestMono = bucket1Repository.save(customerRequest);

//2. Call the external service using webclient
externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);
//2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
externalServiceResponse.subscribe(response -> {
//Initialise the success audit bean and save
//2.1 a) Write Audt to BUCKET2
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
//2.2 a) Write Audt to BUCKET2
//Initialise the error audit bean and save
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
//2.2 b)Delete the inserted
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});
//Get the loan account id and return the same
finalResponse = bucket1Repository.findByCustomerId(customerId);
return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
.doOnSuccess(resp -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.doOnError(error -> {
try {
finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}

我们观察到的几个问题是

  1. 在某些情况下,只有我们订阅了文档,文档才会持久化。这是预期的行为吗?我们是否需要订阅要保存的文档
  2. 出现错误时无法删除文档
  3. 我也知道我没有遵循上面的纯反应式编程。请帮助我提供任何指针,以便在reactive中有效地编写代码

请帮助我们提供任何指针

获取上面的代码块:

externalServiceResponse.subscribe(response -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
}, errorResp -> {
Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});

它有两个反应式编程问题:

  1. 您正在创建未订阅的Monos,因此它们永远不会执行
  2. 无论如何,您不应该在subscribe中创建它们,而是使用flatMap或onErrorResume来链接它们,预订阅

这样的东西应该会起作用(原谅我,我还没有测试过,所以你可能需要做一些调整):

externalServiceResponse
// If something goes wrong then delete the inserted doc
.onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))
// Always want to save the audit regardless
.then(bucket2Repository.save(cfeAudit))
.subscribe();

代码中还有其他问题需要解决,例如,在订阅最终的Mono之前,您似乎想将多个Mono平面映射在一起,但希望这能让您开始。

最新更新