在设计异步 Java API 时,如何确保整个 CompletableFuture 链由内部线程池执行?



我正在编写一个库,它提供了几个返回CompletableFutures的异步方法。该库有一个内部线程池,用于执行异步方法的计算工作。

我想确保满足以下两个要求:

  1. 返回的 CompletableFuture 不是由内部线程完成的,因此我的库外部的 CompletableFuture 链永远不会由库的内部线程池执行
  2. 我的异步方法的所有计算都由内部线程池执行,而不是由用户线程(即方法调用者的线程(执行

所以假设库有以下阻塞方法

Data request(Address address) {
Message request = encode(address);
Message response = sendAndReceive(request);
Data responseData = decode(response);
return responseData;
}

和相应的异步方法

CompletableFuture<Data> requestAsync(Address address) {
return CompletableFuture.supplyAsync(() -> encode(address), internalThreadPool)
.thenCompose(request -> sendAndReceiveAsync(request))
.thenApply(response -> decode(response));
}

第一个要求是通过添加链接.whenCompleteAsync((v,t) -> {})来满足的,如本答案中所述。

但是,要满足第二个要求,需要做些什么呢?

Sergey Kuksenko 在这里讨论了第二个需求的解决方案,并在 Java 11 的 HttpClient 实现中实现了。

不满足此要求,因为不能保证decode(response)由内部线程执行。如果编码和sendAndReceiveAsync快速完成,decode实际上可以由调用方的线程执行。

这个问题可以通过引入一个启动CF链的CompletableFuturestartCf来解决。

因此,完整的解决方案可能如下所示

CompletableFuture<Data> requestAsyncFixedAll(Address address) {
CompletableFuture<Void> startCf = new CompletableFuture<>();
CompletableFuture<Data> dataCf =  startCf.thenApplyAsync(v -> encode(address), internalThreadPool)
.thenCompose(request -> sendAndReceiveAsync(request))
.thenApply(response -> decode(response)).whenCompleteAsync((v, t) -> {});
startCf.complete(null);
return dataCf;
}

相关内容

  • 没有找到相关文章

最新更新