我有一些CompletableFuture
,我想并行运行它们,等待第一个正常返回。
我知道我可以使用CompletableFuture.anyOf
来等待第一个返回,但这会正常或例外地返回。我想忽略异常。
List<CompletableFuture<?>> futures = names.stream().map(
(String name) ->
CompletableFuture.supplyAsync(
() ->
// this calling may throw exceptions.
new Task(name).run()
)
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
logger.info(any.get().toString());
} catch (Exception e) {
e.printStackTrace();
}
您可以使用以下帮助程序方法:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
l.forEach(s -> s.thenAccept(complete));
return f;
}
您可以像这样使用它来证明它将忽略早期的异常,但返回第一个提供的值:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return "with 5s delay";
}),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
return "with 10s delay";
})
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());
这种解决方案的一个缺点是,如果所有期货都异常完成,它将永远不会完成。如果计算成功,它将提供第一个值,但如果根本没有成功的计算,则会异常失败的解决方案涉及更多:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
CompletableFuture.allOf(
l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
return f;
}
它利用了这样一个事实,即allOf
的异常处理程序仅在所有期货完成(异常或否(之后被调用,并且未来只能完成一次(撇开像obtrude…
这样的特殊事情(。执行异常处理程序时,任何使用结果完成未来(如果有(的尝试都已完成,因此只有在以前没有成功完成的情况下,异常完成它的尝试才会成功。
它可以以与第一个解决方案完全相同的方式使用,并且只有在所有计算都失败时才表现出不同的行为,例如:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
// delayed to demonstrate that the solution will wait for all completions
// to ensure it doesn't miss a possible successful computation
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
throw new RuntimeException("failing later"); }
)
);
CompletableFuture<String> c = anyOf(futures);
try { logger.info(c.join()); }
catch(CompletionException ex) { logger.severe(ex.toString()); }
上面的示例使用延迟来演示解决方案在没有成功时将等待所有完成,而 ideone 上的此示例将演示以后的成功如何将结果转化为成功。请注意,由于 Ideones 缓存结果,您可能不会注意到延迟。
请注意,如果所有期货都失败,则无法保证哪些例外情况将被报告。由于它会等待错误情况下的所有完成,因此任何完成都可能达到最终结果。
考虑到:
-
Java哲学的基础之一是防止或阻止不良的编程实践。
(它在多大程度上成功地做到了这一点是另一个争论的主题;不可否认,这仍然是该语言的主要目标之一。
-
忽略异常是一种非常糟糕的做法。
异常应始终重新抛出到上面的层,或处理,或至少报告。具体来说,异常永远不应该被默默吞噬。
-
应尽早报告错误。
例如,查看运行时为了提供快速失败迭代器所经历的痛苦,如果在迭代时修改集合,则会抛出 ConcurrentModificationException。
-
忽略异常完成的
CompletableFuture
意味着 a( 您没有尽早报告错误,以及 b( 您可能计划根本不报告错误。 -
无法简单地等待第一个非异常完成,而不得不被异常完成所困扰,这不会造成任何重大负担,因为您始终可以从列表中删除异常完成的项目(同时不要忘记报告失败,对吧?(并重复等待。
因此,如果 Java 中故意缺少所寻求的功能,我不会感到惊讶,我愿意争辩说它理所当然地缺失了。
(对不起,索蒂里奥斯,没有规范的答案。
嗯,这是框架应该支持的方法。首先,我认为CompletionStage.applyToBoth做了类似的事情,但事实证明它没有。所以我想出了这个解决方案:
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
要查看它的实际效果,这里有一些用法:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class Main {
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
public static <U> CompletionStage<U> delayed(final U value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.complete(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.completeExceptionally(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("Started...");
/*
// Looks like applyToEither doesn't work as expected
CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
*/
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
futures.add(delayed(1, 1000));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
futures.add(delayed(2, 500));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
System.out.println("End...");
}
}
我发现Vertx - CompositeFuture.any 方法在这种情况下非常有用。它是为完全相同的情况而设计的。当然,你必须用户顶点定义未来。Vertx CompositeFuture API Docs
对上面的代码进行了一些更改,允许测试第一个结果是否是预期的。
public class MyTask implements Callable<String> {
@Override
public String call() throws Exception {
int randomNum = ThreadLocalRandom.current().nextInt(5, 20 + 1);
for (int i = 0; i < randomNum; i++) {
TimeUnit.SECONDS.sleep(1);
}
return "MyTest" + randomNum;
}
}
public class CompletableFutureUtils {
private static <T> T resolve(FutureTask<T> futureTask) {
try {
futureTask.run();
return futureTask.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static <V> boolean predicate(Predicate<V> predicate, V v) {
try {
return predicate.test(v);
} catch (Exception e) {
return false;
}
}
public static <T> void cancel(List<FutureTask<T>> futureTasks) {
if (futureTasks != null && futureTasks.isEmpty() == false) {
futureTasks.stream().filter(f -> f.isDone() == false).forEach(f -> f.cancel(true));
}
}
public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate) {
return supplyAsync(futureTasks, predicate, null);
}
public static <V> CompletableFuture<V> supplyAsync(List<FutureTask<V>> futureTasks, Predicate<V> predicate,
Executor executor) {
final int count = futureTasks.size();
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<V> result = new CompletableFuture<V>();
final BiConsumer<V, Throwable> action = (value, ex) -> {
settled.incrementAndGet();
if (result.isDone() == false) {
if (ex == null) {
if (predicate(predicate, value)) {
result.complete(value);
cancel(futureTasks);
} else if (settled.get() >= count) {
result.complete(null);
}
} else if (settled.get() >= count) {
result.completeExceptionally(ex);
}
}
};
for (FutureTask<V> futureTask : futureTasks) {
if (executor != null) {
CompletableFuture.supplyAsync(() -> resolve(futureTask), executor).whenCompleteAsync(action, executor);
} else {
CompletableFuture.supplyAsync(() -> resolve(futureTask)).whenCompleteAsync(action);
}
}
return result;
}
}
public class DemoApplication {
public static void main(String[] args) {
List<FutureTask<String>> tasks = new ArrayList<FutureTask<String>>();
for (int i = 0; i < 2; i++) {
FutureTask<String> task = new FutureTask<String>(new MyTask());
tasks.add(task);
}
Predicate<String> test = (s) -> true;
CompletableFuture<String> result = CompletableFutureUtils.supplyAsync(tasks, test);
try {
String s = result.get(20, TimeUnit.SECONDS);
System.out.println("result=" + s);
} catch (Exception e) {
e.printStackTrace();
CompletableFutureUtils.cancel(tasks);
}
}
}
调用CompletableFutureUtils.cancel(tasks);
非常重要,因此当发生超时时,它将取消后台任务。