我有一个客户端列表,我想从外部api收集这些客户端的数据。我想启动多个线程来收集数据并等待所有线程完成,如果每个线程在一定时间内未完成,我想将其保存在数据库中。我使用CompletableFuture.allOf
我的代码是这样的
public void fetchDataForAllClients() {
String previousDate = DateUtils.getPreviousDate();
List<Integer> clientIdList = PropertiesUtil.getClientIdList();
CompletableFuture.allOf(clientIdList.stream()
.map(clientId -> fetchData(previousDate, clientId)
.exceptionally(e -> {
LOGGER.error(e.getMessage(), e);
return null;
})
.thenAcceptAsync(s -> System.out.println(s + ". FetchDataThread Finished for "+ clientId + " at " + LocalDateTime.now())))
.toArray(CompletableFuture<?>[]::new))
.join();
}
@Async
CompletableFuture<Integer> fetchData(final String date, final Integer clientId) {
counter++;
System.out.println(counter + ". FetchDataThread Started for "+ clientId + " at " + LocalDateTime.now());
boolean failed = false;
String errorMsg = null;
try {
myApiService.fetchDataForClient(clientId, date, date);
} catch (MyApiException exception) {
failed = true;
errorMsg = exception.getMessage();
}
fetchStatsService.createFetchStats(clientId, date, failed, errorMsg);
return CompletableFuture.completedFuture(counter);
}
这个问题是,它不启动fetchData(previousDate, clientId)
在异步。按顺序运行。
@Aync将无法工作,如果它从同一类中调用,因为它将调用原始方法而不是拦截的方法,所以更改fetchData方法返回Integer然后使用compleatableFuture.supplyAsync()调用方法实际上会生成新线程来执行该方法
List<CompleatbleFutures> futures= clientIdList.stream()
.map(id->CompleatbleFutures.supplyAsync(fetchdata(..)))
.collect(Collectors.toList());
CompleatbleFuture.allOf(futures.toArray(futures.size));