如何使用web客户端在循环中调用rest api,并在java中以非阻塞的方式处理所有响应



我正在尝试使用WebClient在循环中调用rest api,并以非阻塞的方式处理响应
用例如下,
我需要调用一个最多返回7天(startDate,endDate(数据的rest api,但我收到的输入日期范围可以是任何时期,如30天、6个月、1年等,我需要相应地调用api
因此,例如,如果我的输入日期范围为30天,那么我需要调用api 5次才能获得所需的数据
假设我需要找到2022-09-01至2022-09-30期间贷款金额最低的人。我可以使用这个api getLoanDetails(startDate,endDate(,它只接受7天内的日期。所以我需要调用这个api 5次(2022-09-01到2022-09-07、2022-09-08到2022-09-14、2022-09-15到2022-09-21、2022-09-22到2022-09-38和2022-09-29到2022-02-09-30(

我创建了一个从输入日期起7天的日期列表,并在其上运行一个循环,并在循环中使用WebClient调用api。我需要解析json响应,从中创建Person对象,并将所有loanAmount最小的人收集到一个列表中(因为可能有很多人拥有相同的loanAmounts(
record Person(String name, BigDecimal loanAmount, LocalDate dateOfBirth) {}

Flux<Person> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {
List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
for (Pair<LocalDate> periodToFindLoanDetails:dateList) { 
Mono<List<Person>> personsWithMinimumLoanAmount = 
callLoanApi(periodToFindLoanDetails.startDate(), periodToFindLoanDetails.endDate())
.map(this::findPeopleWithMinimumLoanAmount);
**//TODO: Compare the result of all the loan api calls in a non blocking way and 
return Flux<Person> or Mono<List<Person>>**
}
Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) { 
return WebClient.create(apiURI)
.get()
.uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
.queryParam("start_date", startDate)
.queryParam("end_date", endDate).build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
List<Person> findPeopleWithMinimumLoanAmount(String jsonFromLoanApi)

现在,我正在努力使用响应式数据结构来以非阻塞的方式处理来自所有api调用的响应。我当然可以使用block((,从每次调用中获得结果,并将其保存在某个中间列表中,然后与下一个结果进行比较,并不断寻找最小值,直到我得到所有结果。以非阻塞和高效的方式做到这一点的理想/更好的方法是什么。提前感谢您仔细阅读我的问题,并对长问题或问题不清楚表示歉意。请在评论中告诉我,我可以纠正。我省略了一些代码,因为它与IMO.无关

这是一个非常宽泛的问题,具体答案取决于您的环境。假设以这种方式调用您正在使用的API对于API的所有者来说是可以的,并且它可以应付负载。

链到具有subscribeflatMapthen变体之一的成功返回的Mono。如果要执行批量插入,请组合Mono并将它们压缩在一起,例如使用zipWith或手动。

对于一个简单的用例,这将产生类似于(为了简洁起见,使用Kotlin(的结果:

@Transactional
fun save(todo: Todo): Mono<Todo> {
return todoRepository.save(mapToEntity(todo))
.flatMap { saved -> updateTasks(todo, saved) }
}

https://github.com/AndreasKl/todo-reactive/blob/main/src/main/kotlin/todoservice/todo/TodoUseCases.kt#L36

经过一番挖掘,我找到了一个可能的解决方案,如下所示,我没有使用for循环多次调用loanApi,而是使用Flux.parallel进行同样的调用,代码如下所示

ParallelFlux<List<Person>> findPeopleWithMinimumLoan(LocalDate startDate, LocalDate endDate) {
List<Pair<LocalDate>> dateList = getDates(startDate,endDate);
return Flux.fromIterable(dateList)
.parallel()
.flatMap(period -> callLoanApi(period.startDate(), period.endDate()))
.map(this::parseJsonResponse)
.flatMap(people -> Flux.fromIterable(people))
.collect(ArrayList::new, collectPeopleFromAllCalls);
}
Mono<String> callLoanApi(LocalDate startDate, LocalDate endDate) { 
return WebClient.create(apiURI)
.get()
.uri(uriBuilder -> uriBuilder.queryParam("aoi_key", apiKey)
.queryParam("start_date", startDate)
.queryParam("end_date", endDate).build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(String.class);
}
List<Person> parseJsonResponse(String jsonFromLoanApi)

最新更新