Java's CompletableFuture and Threads



我想使用异步模式启动CompletableFutureJava 8-9的线程,这些是我的类和线程:

我有 3 个线程。我的类包含单个方法myMethod()

Class_1 class_1 = new Class_1();
Class_2 class_2 = new Class_2(); 
Class_3 class_3 = new Class_3();

设置我的Runnable如下:

Runnable runnableClass_1 = new Runnable(){
public void run(){
class_1.myMethod();
try { Thread.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
}
};
Runnable runnableClass_2 = new Runnable(){
public void run(){
class_2.myMethod();
try { Thread.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
}
};
Runnable runnableClass_3 = new Runnable(){
public void run(){
class_3.myMethod();
try { Thread.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
}
};  

创建线程 :

Thread t_1 = new Thread( runnableClass_1 );
Thread t_2 = new Thread( runnableClass_2 );
Thread t_3 = new Thread( runnableClass_3 );

最后,我的问题是如何使用异步模式启动这三个线程CompletableFuture

以下是实现相同方法的方法:

List<String> results = new ArrayList<String>();
CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> {
pauseSeconds(2);
results.add("first task");
}, service);
CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> {
pauseSeconds(3);
results.add("second task");
}, service);
CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,
() -> results.add(results.get(0)+ "&"+results.get(1)),service);
pauseSeconds(4);
System.out.println("finisher.isDone() = "+finisher.isDone());
System.out.println("results.get(2) = "+results.get(2));
//       assert(finisher.isDone(), is(true));
//       assertThat(results.get(2),is("first task&second task"));
}
public static void pauseSeconds(int num){
try {
Thread.sleep(num);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

你如何设置(并可能组合(你的未来取决于你的用例:这些未来是相互依赖的吗?您需要按顺序执行它们还是可以并行运行它们?您是关心所有三个结果,还是只需要先完成的未来?

根据您的答案,您可以使用flatMap/bind组合器(它们有不同的CompletableFuture名称,但您可以计算出来(按顺序链接您的期货,或者您可以从当前线程生成所有期货(让它们并行运行(,然后等待所有期货完成。您还可以指定一个特定的线程池来CompletableFuture工厂方法,只使用默认值(ForkJoinPool(。

所有这些都可以通过vavr提供的Future的monadic版本非常简洁地完成。但是,如果您查看其文档,也可以使用CompletableFuture提出解决方案。

更新/请求的示例

下面的例子基本上取自 Java 8 in Action github 存储库,其中提供的期货并行运行,并且所有期货的结果都累积到一个集合中。你所做的是,你把一个List<Future<T>>变成一个Future<List<T>>

final long startTime = System.currentTimeMillis();
final CompletableFuture<String> foo = CompletableFuture.supplyAsync(() -> {
final long timeout = 500;
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("foo on %s sleeping for %s", Thread.currentThread(), timeout));
return "foo";
});
final CompletableFuture<String> bar = CompletableFuture.supplyAsync(() -> {
final long timeout = 100;
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("bar on %s sleeping for %s", Thread.currentThread(), timeout));
return "bar";
});
final CompletableFuture<String> baz = CompletableFuture.supplyAsync(() -> {
final long timeout = 1000;
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("baz on %s sleeping for %s", Thread.currentThread(), timeout));
return "baz";
});
CompletableFuture
.supplyAsync(() -> Stream.of(foo, bar, baz).map(future -> future.join()).collect(Collectors.toList()))
.thenAccept(done -> System.out.println(String.format("Done with all futures %s", done)))
.thenRun(() -> System.out.println(String.format("Running all futures in parallel took %s millis", System.currentTimeMillis() - startTime)));

输出应如下所示:

bar on Thread[ForkJoinPool.commonPool-worker-2,5,main] sleeping for 100
foo on Thread[ForkJoinPool.commonPool-worker-9,5,main] sleeping for 500
baz on Thread[ForkJoinPool.commonPool-worker-11,5,main] sleeping for 1000
Done with all futures [foo, bar, baz]
Running all futures in parallel took 1007 millis

最新更新