用于创建CompletableFuture的自己的ExecutorService不会终止



我试图使用我自己的ExecutorService来创建一组CompletableFutures来链接几个过程步骤。这些步骤可能会抛出异常。

当他们这样做时,在我看来,ExecutorService中的线程没有被释放,尽管我试图处理这种情况。

class Scratch {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
AtomicInteger counter = new AtomicInteger();
Supplier<?> throwingException = () -> { throw new RuntimeException("throw " + counter.incrementAndGet()); };
Function<String, CompletableFuture<?>> process =
url -> CompletableFuture.supplyAsync(throwingException, executor)
.exceptionally(Scratch::log);
var collect = IntStream.range(1, 10).mapToObj(i -> "url" + i)
.map(process)
.toArray(CompletableFuture[]::new);
final CompletableFuture<Void> together = CompletableFuture.allOf(collect);
System.out.println("joining");
together.exceptionally(Scratch::log).join();
System.out.println("finished");
if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("exiting cleanly");
} else {
System.out.println("not terminated");
}
executor.submit(() -> System.out.println("still executing"));
}
static <T> T log(Throwable t) {
System.out.println(t.getMessage());
return null;
}
}

输出
java.lang.RuntimeException: throw 1
joining
java.lang.RuntimeException: throw 2
java.lang.RuntimeException: throw 3
java.lang.RuntimeException: throw 4
java.lang.RuntimeException: throw 5
java.lang.RuntimeException: throw 6
java.lang.RuntimeException: throw 7
java.lang.RuntimeException: throw 8
java.lang.RuntimeException: throw 9
finished
not terminated

由此启动的进程也不会终止(这是我注意到的)。

在我看来,这应该意味着没有线程留在ExecutorService在这一点上,但情况似乎并非如此;如果我们降低线程池容量,它仍然会运行所有提交的任务,如果我们在失败的终止后添加提交另一个任务(例如executor.submit(() -> System.out.println("still executing"));),它将被执行。

如果我们不把自己的ExecutorService传递给CompletableFutre::supplyAsync,进程将如期终止。

我还尝试了处理异常状态的其他版本(如使用together.whenComplete()),但结果相同。

为什么会发生这种情况,我如何确保ExecutorService正确终止?

编辑:我意识到这不是导致问题的异常,使用您自己的执行器服务提供给CompletableFuture的任何任务都会发生这种情况,考虑到Eugene的回复,这是完全有意义的。我正在更改题目。

这里发生了两件事。首先,当您在没有显式Executor的情况下执行时,您的操作在ForkJoinPool中运行。该池使用守护线程,它们不会停止虚拟机退出。所以当你的main结束时,VM存在。

第二点在awaitTermination的文档中,实际上是:

在关机请求后阻塞直到所有任务完成执行,或者发生超时,或者当前线程被中断,以先发生的为准。

由于您没有调用shutDown,并且该池创建了非守护进程线程,因此进程没有退出。

最新更新