我开始适应JavaCompletableFuture
组合,使用JavaScript承诺。基本上,组合程序只是在指定的执行器上调度链接命令。但我不确定哪个线程正在运行时,组成执行。
假设我有两个执行人,executor1
和executor2
;为简单起见,我们假设它们是单独的线程池。我安排了一个CompletableFuture
(使用一个非常松散的描述):
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(this::getFoo, executor1);
完成后,我使用第二个执行器将Foo
转换为Bar
:
CompletableFuture<Bar> futureBar .thenApplyAsync(this::fooToBar, executor2);
我理解getFoo()
将从executor1
线程池中的线程调用。我理解fooToBar()
将从executor2
线程池中的线程调用。
但是实际合成使用的线程是什么,即在getFoo()
完成后,futureFoo()
完成;但是在fooToBar()
命令被调度到executor2
之前?换句话说,是哪个线程实际运行代码来调度第二个执行器上的第二个命令?
调度是否作为executor1
中称为getFoo()
的同一线程的一部分执行?如果是这样,这个可完成的未来组合是否等同于我在executor1
任务的第一个命令中手动调度fooToBar()
?
这是故意未指定的。在实践中,当没有Async
后缀的变体被调用并显示类似的行为时,它将由处理链接操作的相同代码处理。
CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
return "";
}, r -> new Thread(r, "A").start())
.thenAcceptAsync(s -> {}, r -> {
System.out.println("scheduled by " + Thread.currentThread());
new Thread(r, "B").start();
});
很可能会打印
scheduled by Thread[A,5,main]
作为完成前一阶段的线程,用于调度依赖操作。
但是当我们使用
时CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> "",
r -> new Thread(r, "A").start());
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
first.thenAcceptAsync(s -> {}, r -> {
System.out.println("scheduled by " + Thread.currentThread());
new Thread(r, "B").start();
});
很可能会打印
scheduled by Thread[main,5,main]
当主线程调用thenAcceptAsync
时,第一个future已经完成,主线程将自己调度动作。
但这并不是故事的结尾。当我们使用
CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(5));
return "";
}, r -> new Thread(r, "A").start());
Set<String> s = ConcurrentHashMap.newKeySet();
Runnable submitter = () -> {
String n = Thread.currentThread().getName();
do {
for(int i = 0; i < 1000; i++)
first.thenAcceptAsync(x -> s.add(n+" "+Thread.currentThread().getName()),
Runnable::run);
} while(!first.isDone());
};
Thread b = new Thread(submitter, "B");
Thread c = new Thread(submitter, "C");
b.start();
c.start();
b.join();
c.join();
System.out.println(s);
它可能不仅打印第一种场景中的B A
和C A
组合,也打印第二种场景中的B B
和C C
组合。在我的机器上,它还可重复地打印B C
和C B
的组合,表明一个线程传递给thenAcceptAsync
的操作由另一个线程同时以不同的操作调用thenAcceptAsync
提交给执行器。
这与这个答案中描述的线程计算传递给thenApply
(不包括Async
)的函数的场景相匹配。正如开头所说,这正是我所期望的,因为这两件事很可能由相同的代码处理。但是,与计算传递给thenApply
的函数的线程不同,在Executor
上调用execute
方法的线程甚至没有在文档中提到。因此,理论上,另一个实现可以使用一个完全不同的线程,将来不调用方法,也不完成它。
最后是一个简单的程序,它喜欢您的代码片段,并允许您使用它。
输出确认你提供的执行器被调用到complete(除非你显式地提前调用了complete——这可能发生在调用complete的线程中),当它等待的条件已经准备好时——Future上的get()阻塞直到Future完成。
提供一个参数-有一个执行器1和执行器2,不提供任何参数,只有一个执行器。输出是(相同的执行器—在相同的执行器中依次作为单独的任务运行)—
In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-1-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed
OR(两个执行器-事情再次顺序运行,但使用不同的执行器)-
In thread Thread[main,5,main] - getFoo
In thread Thread[main,5,main] - getFooToBar
In thread Thread[pool-1-thread-1,5,main] - Supplying Foo
In thread Thread[pool-2-thread-1,5,main] - fooToBar
In thread Thread[main,5,main] - Completed
记住:带有执行器(在本例中)的代码可以立即在另一个线程中启动。getFoo在设置FooToBar之前就被调用了。
代码如下-
package your.test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
public class TestCompletableFuture {
private static void dumpWhichThread(final String msg) {
System.err.println("In thread " + Thread.currentThread().toString() + " - " + msg);
}
private static final class Foo {
final int i;
Foo(int i) {
this.i = i;
}
};
public static Supplier<Foo> getFoo() {
dumpWhichThread("getFoo");
return new Supplier<Foo>() {
@Override
public Foo get() {
dumpWhichThread("Supplying Foo");
return new Foo(10);
}
};
}
private static final class Bar {
final String j;
public Bar(final String j) {
this.j = j;
}
};
public static Function<Foo, Bar> getFooToBar() {
dumpWhichThread("getFooToBar");
return new Function<Foo, Bar>() {
@Override
public Bar apply(Foo t) {
dumpWhichThread("fooToBar");
return new Bar("" + t.i);
}
};
}
public static void main(final String args[]) throws InterruptedException, ExecutionException, TimeoutException {
final TestCompletableFuture obj = new TestCompletableFuture();
obj.running(args.length == 0);
}
private String running(final boolean sameExecutor) throws InterruptedException, ExecutionException, TimeoutException {
final Executor executor1 = Executors.newSingleThreadExecutor();
final Executor executor2 = sameExecutor ? executor1 : Executors.newSingleThreadExecutor();
CompletableFuture<Foo> futureFoo = CompletableFuture.supplyAsync(getFoo(), executor1);
CompletableFuture<Bar> futureBar = futureFoo.thenApplyAsync(getFooToBar(), executor2);
try {
// Try putting a complete here before the get ..
return futureBar.get(50, TimeUnit.SECONDS).j;
}
finally {
dumpWhichThread("Completed");
}
}
}
哪个线程触发Bar阶段进行-在上面-它是executor1。一般来说,线程完成未来(即给它一个值)是什么释放的东西取决于它。如果你在主线程上立即完成FutureFoo,它将是触发它的那个。
所以你必须小心这个。如果你有&;n &;所有的事情都在等待未来的结果——但是只使用单线程执行器——那么第一个调度的执行器将阻塞该执行器,直到它完成。你可以推断出M个线程,N个未来——它可以衰变成"锁阻止了其他事情的进展。