使用并行流执行独立任务?



我有一个任务列表。每个任务都是相互独立的(它们不使用彼此的结果(。

当有 1000 个任务并使用顺序流来处理这些任务时。

tasks.forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

..然后,第二个任务在第一个任务之后运行,依此类推。循环以阻塞和顺序模式运行(第二个任务仅在第一个任务完成后完成(。

并行处理每个任务的最佳方法是什么?

这是最好的方法吗?

tasks.parallelStream().forEach(task->{
            // long running task
            task.run();
            System.out.println("Thread: " + Thread.currentThread().getName());
        });

根据我是否应该尽可能始终使用并行流?,应避免使用并行流。与我的情况一样,这些任务彼此独立,我不需要使用 parallelStream() 带来的同步开销。但是,在使用 parallelStream() 时,没有禁用同步开销的选项。或?

对于我的用例,有没有比parallelStream()更好的方法?

在 Java 8 中,parallelStream()使用在 JVM 启动时初始化的ForkJoinCommonPool,其中包含固定数量的线程,这些线程更适合可以遵循"分而治之"范式的工作。在您的情况下,由于它们都是隔离的,因此使用ExecutorService可能更合适。

一个好的解决方案是使用 CompletableFuture.allOf .像这样使用它:

ExecutorService ex = //Whatever executor you want;
CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
        .map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
        .toArray());

在此过程中,您可以执行异步、非阻塞。此外,您将收到有关类型转换的编译器警告,但我认为在您的情况下,忽略它是安全的。

ExecutorService.submit会触发任务,但是当您使用get获取任何结果时,它将阻止然后检索。 获取数据时,CompletableFuture不会阻止。当您希望在所有并行任务完成后查看返回的某种结果时,就是这种情况。
可以在此处找到更多解释。

另外,在您的原始问题中,您问使用parallelStream是否是个好主意,我的回答是这不是一个好主意,因为如果有一个任务阻塞线程,那么您将遇到问题(假设您在代码中到处都使用了parallelStream(。

此外,CompletableFuture可以接受它自己的线程池(您可以自己自定义(并在那里运行。请注意上述代码中要runAsync的第二个参数。

如果你只是想有一个即发即弃的机制,而不关心结果,那么使用ExecutorService.invokeAll是一个很好的方法。你可以像这样使用它:

 executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    // run task;
                    return null;
                }
            })
.collect(Collectors.toList()));  

但是,在这种情况下,为什么要使用带有自己ExecutorServiceCompletableFuture呢?
一个很好的原因是流畅的错误处理。你可以在这里和这里看到一些例子

最新更新