我有一个可完成的future,它只运行另一个可执行的future(通常需要大约2秒,超时50毫秒(,并等待它完成,超时1秒。
问题是内部future的超时永远不会工作,但get
工作了大约两秒,尽管它有50毫秒的超时,因此外部CompletableFuture超时。
sleepFor2000Ms
调用Thread.sleep(2000)
private static void oneCompletableFutureInsideAnother() throws InterruptedException, ExecutionException{
long time = System.nanoTime();
try{
System.out.println("2 started");
CompletableFuture.runAsync(() -> {
long innerTime = System.nanoTime();
try{
System.out.println("inner started");
CompletableFuture.runAsync(TestApplication::sleepFor2000Ms)
.get(50, TimeUnit.MILLISECONDS); // this get doesn't work
// it waits way longer, until the future completes successfully
System.out.println("inner completed successfully");
}catch(InterruptedException | ExecutionException | TimeoutException e){
System.out.println("inner timed out");
}
long innerTimeEnd = System.nanoTime();
System.out.println("inner took " + (innerTimeEnd - innerTime)/1_000_000 + " ms");
}).get(1, TimeUnit.SECONDS);
System.out.println("2 completed successfully");
}catch(TimeoutException e){
System.out.println("2 timed out");
}
long endTime = System.nanoTime();
System.out.println("2 took " + (endTime - time)/1_000_000 + " ms");
}
预期输出如下(我在java8上得到了这个输入(:
2 started
inner started
inner timed out
inner took 61 ms
2 completed successfully
2 took 62 ms
实际输出是(我在java9和更高版本上得到的(:
2 started
inner started
2 timed out
2 took 1004 ms
inner completed successfully
inner took 2013 ms
如果我做同样的工作,但在单个CompletableFuture内,它会正确超时:
private static void oneCompletableFuture() throws InterruptedException, ExecutionException{
long time = System.nanoTime();
try{
System.out.println("1 started");
CompletableFuture.runAsync(TestApplication::sleepFor2000Ms)
.get(50, TimeUnit.MILLISECONDS); // this get works ok
// it waits for 50 ms and then throws TimeoutException
System.out.println("1 completed successfully");
}catch(TimeoutException e){
System.out.println("1 timed out");
}
long endTime = System.nanoTime();
System.out.println("1 took " + (endTime - time)/1_000_000 + " ms");
}
它是打算以这种方式工作,还是我做错了什么,或者可能是java库中的错误?
与Java 8版本不同,新版本的.get(50, TimeUnit.MILLISECONDS)
调用尝试执行一些其他挂起的任务,而不是阻塞调用方线程,没有考虑到它无法预测这些任务可能需要多长时间,因此,它可能会在多大程度上错过超时目标。当它碰巧接到它正在等待的任务时,结果就像根本没有超时一样。
当我将Thread.dumpStack();
添加到sleepFor2000Ms()
时,受影响的环境会打印类似的内容
java.lang.Exception: Stack trace
at java.base/java.lang.Thread.dumpStack(Thread.java:1380)
at TestApplication.sleepFor2000Ms(TestApplication.java:36)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpAsyncBlocker(ForkJoinPool.java:1253)
at java.base/java.util.concurrent.ForkJoinPool.helpAsyncBlocker(ForkJoinPool.java:2237)
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1933)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
at TestApplication.lambda$0(TestApplication.java:15)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
但请注意,这是一场比赛。这种情况并不总是发生。当我将内部代码更改为时
CompletableFuture<Void> inner
= CompletableFuture.runAsync(TestApplication::sleepFor2000Ms);
LockSupport.parkNanos(1_000_000);
inner.get(50, TimeUnit.MILLISECONDS);
超时可重复工作(尽管在重负载下仍可能失败(。
我找不到匹配的错误报告,但是ForkJoinTask
也有类似的问题,ForkJoinTask.get(timeout(可能永远等待。这个问题还没有解决。
我预计,当虚拟线程(又名项目Loom(成为现实时,这些问题将消失,因为那时,没有理由避免阻塞线程,因为底层的本地线程可以在没有这些怪癖的情况下重用。
在此之前,您通常应该避免阻塞工作线程。Java 8在工作线程被阻塞时启动补偿线程的策略扩展性不好,因此您正在用一个问题交换另一个问题。