我有一个类型为 T
的实体列表。我还有一个功能接口,它充当Supplier
它的方法可以在实体上performTask
并发回结果R
如下所示:
R performTask(T entity) throws Exception
.
我想过滤两者:成功的结果和由此产生的错误和异常到单独的地图上。我在这里写的代码需要时间,请建议可以做什么。
我正在实体列表上循环,然后一个接一个地处理它们可完成的未来,我认为这不是正确的方法。你们能建议在这里做什么吗?
private void updateResultAndExceptionMaps(List < T > entities, final TaskProcessor < T, R > taskProcessor) {
ExecutorService executor = createExecutorService();
Map < T, R > outputMap = Collections.synchronizedMap(new HashMap < T, R > ());
Map < T, Exception > errorMap = new ConcurrentHashMap < T, Exception > ();
try {
entities.stream()
.forEach(entity -> CompletableFuture.supplyAsync(() -> {
try {
return taskProcessor.performTask(entity);
} catch (Exception e) {
errorMap.put(entity, (Exception) e.getCause());
LOG.error("Error processing entity Exception: " + entity, e);
}
return null;
}, executor)
.exceptionally(throwable -> {
errorMap.put(entity, (Exception) throwable);
LOG.error("Error processing entity Throwable: " + entity, throwable);
return null;
})
.thenAcceptAsync(R -> outputMap.put(entity, R))
.join()
); // end of for-each
LOG.info("outputMap Map -> " + outputMap);
LOG.info("errorMap Map -> " + errorMap);
} catch (Exception ex) {
LOG.warn("Error: " + ex, ex);
} finally {
executor.shutdown();
}
}
outputmap
应包含实体和结果,R
。
errorMap
应包含实体和Exception
。
这是因为您逐个迭代List
实体,创建CompletableFuture
对象并立即阻止迭代,因为join
方法会等到给定处理器完成工作或抛出异常。您可以通过将每个实体转换为 CompletableFuture
、收集所有CompletableFuture
实例,然后等待每个实例上的所有调用join
来获得完整的多线程支持。
下面的代码应该可以在您的情况下解决问题:
entities.stream()
.map(entity -> CompletableFuture.supplyAsync(() -> {
try {
return taskProcessor.performTask(entity);
} catch (Exception e) {
errorMap.put(entity, (Exception) e.getCause());
}
return null;
}, executor)
.exceptionally(throwable -> {
errorMap.put(entity, (Exception) throwable);
return null;
})
.thenAcceptAsync(R -> outputMap.put(entity, R))
).collect(Collectors.toList())
.forEach(CompletableFuture::join);