Java异步启动多个CompletableFuture并等待它们完成



我有一个客户端列表,我想从外部api收集这些客户端的数据。我想启动多个线程来收集数据并等待所有线程完成,如果每个线程在一定时间内未完成,我想将其保存在数据库中。我使用CompletableFuture.allOf

我的代码是这样的

public void fetchDataForAllClients() {
String previousDate = DateUtils.getPreviousDate();
List<Integer> clientIdList = PropertiesUtil.getClientIdList();
CompletableFuture.allOf(clientIdList.stream()
.map(clientId -> fetchData(previousDate, clientId)
.exceptionally(e -> {
LOGGER.error(e.getMessage(), e);
return null;
})
.thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
.toArray(CompletableFuture<?>[]::new))
.join();
}
@Async
CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
counter++;
System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
boolean failed = false;
String errorMsg = null;
try {
myApiService.fetchDataForClient(clientId, date, date);
} catch (MyApiException exception) {
failed = true;
errorMsg = exception.getMessage();
}
fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
return CompletableFuture.completedFuture(counter);
}

这个问题是,它不启动fetchData(previousDate, clientId)在异步。按顺序运行。

@Aync将无法工作,如果它从同一类中调用,因为它将调用原始方法而不是拦截的方法,所以更改fetchData方法返回Integer然后使用compleatableFuture.supplyAsync()调用方法实际上会生成新线程来执行该方法

List<CompleatbleFutures> futures= clientIdList.stream()
.map(id->CompleatbleFutures.supplyAsync(fetchdata(..)))
.collect(Collectors.toList());
CompleatbleFuture.allOf(futures.toArray(futures.size));

最新更新