可压缩未来:几项任务



如何使用 5 个 CompletableFutures 异步执行 20 个可运行任务(或 1 个任务 20 次)?

这就是我得到的:

Runnable task = () -> {
long startTime = System.currentTimeMillis();
Random random = new Random();
while (System.currentTimeMillis() - startTime < 3000) {
DoubleStream.generate(() -> random.nextDouble())
.limit(random.nextInt(100))
.map(n -> Math.cos(n))
.sum();
}
System.out.println("Done");
};
for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
}

如果我执行此代码,我可以看到它只异步运行 3 个 future.get(): 3 然后是 1 for() 迭代期间剩下的 2

个所以,我想尽可能异步地完成所有 20 个任务

您可以使用 allOf 将多个任务作为一个任务同时运行。首先,我创建了 5 个任务的组合(与您的问题相同),但后来我添加了 10 个任务(并且只倾斜了两次),并获得了一半的执行时间。

for (int i = 0; i < 2; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
// and so on until ten  
CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);
combined.get();
}
CompletableFuture

的默认执行器是ForkJoinPool的公共池,其默认目标并行度与 CPU 内核数减去 1 相匹配。因此,如果您有四个内核,则最多三个作业将异步执行。由于每 5 个作业强制等待完成,因此将获得三个并行执行,然后在每个循环迭代中执行两个并行执行。

如果要获得特定的执行策略(如所选的并行性),最好的方法是指定正确配置的执行程序。然后,您应该让执行器管理并行性,而不是在循环中等待。

ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

这允许五个并行作业,但将允许五个线程中的每一个在完成一个作业后立即选取一个新作业,而不是等待下一个循环迭代。

但是当你说

所以,我想尽可能异步地完成所有 20 个任务

目前尚不清楚为什么在安排五个作业后强制执行等待。最大并行度可通过以下方式实现

ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks

这可能会生成与作业一样多的线程,除非一个作业在计划所有作业之前完成,因为在这种情况下,工作线程可能会选取一个新作业。

但这种逻辑根本不需要CompletableFuture。您还可以使用:

ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

但是,当您的工作不涉及 I/O 或任何其他类型的等待或释放 CPU 时,创建比 CPU 内核更多的线程是没有意义的。最好使用配置为处理器数量的池。

ExecutorService pool = Executors.newWorkStealingPool(
Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();

在您的特殊情况下,这可能会运行得更慢,因为您的作业使用系统时间在线程多于内核时看起来运行得更快,但实际上当时所做的工作更少。但对于普通的计算任务,这将提高性能。

将以下系统属性设置为希望公共分叉联接池使用的线程数:

java.util.concurrent.ForkJoinPool.common.parallelism 

参见 ForkJoinPool

原因是您在构建可完成期货时没有指定自己的分叉连接池,因此它隐式使用

ForkJoinPool.commonPool()

参见 CompletableFurure

最新更新