我有一个任务列表。每个任务都是相互独立的(它们不使用彼此的结果(。
当有 1000 个任务并使用顺序流来处理这些任务时。
tasks.forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
..然后,第二个任务在第一个任务之后运行,依此类推。循环以阻塞和顺序模式运行(第二个任务仅在第一个任务完成后完成(。
并行处理每个任务的最佳方法是什么?
这是最好的方法吗?
tasks.parallelStream().forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});
根据我是否应该尽可能始终使用并行流?,应避免使用并行流。与我的情况一样,这些任务彼此独立,我不需要使用 parallelStream()
带来的同步开销。但是,在使用 parallelStream()
时,没有禁用同步开销的选项。或?
对于我的用例,有没有比parallelStream()
更好的方法?
在 Java 8 中,parallelStream()
使用在 JVM 启动时初始化的ForkJoinCommonPool
,其中包含固定数量的线程,这些线程更适合可以遵循"分而治之"范式的工作。在您的情况下,由于它们都是隔离的,因此使用ExecutorService
可能更合适。
一个好的解决方案是使用 CompletableFuture.allOf
.像这样使用它:
ExecutorService ex = //Whatever executor you want;
CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
.map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
.toArray());
在此过程中,您可以执行异步、非阻塞。此外,您将收到有关类型转换的编译器警告,但我认为在您的情况下,忽略它是安全的。
ExecutorService.submit
会触发任务,但是当您使用get
获取任何结果时,它将阻止然后检索。 获取数据时,CompletableFuture
不会阻止。当您希望在所有并行任务完成后查看返回的某种结果时,就是这种情况。
可以在此处找到更多解释。
另外,在您的原始问题中,您问使用parallelStream
是否是个好主意,我的回答是这不是一个好主意,因为如果有一个任务阻塞线程,那么您将遇到问题(假设您在代码中到处都使用了parallelStream
(。
此外,CompletableFuture
可以接受它自己的线程池(您可以自己自定义(并在那里运行。请注意上述代码中要runAsync
的第二个参数。
如果你只是想有一个即发即弃的机制,而不关心结果,那么使用ExecutorService.invokeAll
是一个很好的方法。你可以像这样使用它:
executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
@Override
public Void call() throws Exception {
// run task;
return null;
}
})
.collect(Collectors.toList()));
但是,在这种情况下,为什么要使用带有自己ExecutorService
的CompletableFuture
呢?一个很好的原因是流畅的错误处理。你可以在这里和这里看到一些例子