反应器(或Akka)解决方案的可替代未来解决方案



我有以下方法使用这样的CompletableFuture

public AClass aMethod() {
    CompletableFuture<SomeClassA> someClassAFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsA());
    CompletableFuture<SomeClassB> someClassBFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsB());
    CompletableFuture<SomeClassC> someClassCFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsC());
    CompletableFuture.allOf(someClassAFuture, someClassBFuture, someClassCFuture).join();
    return new AClass(someClassAFuture.join(), someClassBFuture.join(), someClassCFuture.join());
}

如果 fork join 池中的线程少于 T * 3 个线程,则当T线程同时进入该方法时,此代码会出现死锁问题(因为所有allOf调用都无法完成,并且它们不会将当前获取的线程返回到池中(。

我发现解决这个问题的唯一方法是限制方法内同时存在的线程(使用 Spring 的 @Async 注释和线程执行器(或增加 fork join 池中的线程。

我想要一些更好的解决方案,我可以完全忘记线程池大小。如何使用 Reactor 或 Akka 重写它?

Akka 期货中的实现是这样的(完全未经测试(:

Future< SomeClassA > f1 = future(() -> someMethodThatReturnsA(), system.dispatcher());
Future< SomeClassB > f2 = future(() -> someMethodThatReturnsB(), system.dispatcher());
Future< SomeClassC > f3 = future(() -> someMethodThatReturnsC(), system.dispatcher());
List<Future<Object>> futures = Arrays.asList(f1, f2, f3);
return sequence(futures).map((results) ->  new AClass(results.get(0),results.get(1),results.get(2)));

在创建AClass之前,可能需要一些额外的工作来解析期货结果。请注意,您现在在aMethod中返回Future< AClass >

但是,代码的问题在于它阻塞了。您是否尝试使用thenApplythenCompose加入所有CompletableFutures以返回CompletableFuture<AClass>

相关内容

最新更新