我正在编写一个库,它提供了几个返回CompletableFutures的异步方法。该库有一个内部线程池,用于执行异步方法的计算工作。
我想确保满足以下两个要求:
- 返回的 CompletableFuture 不是由内部线程完成的,因此我的库外部的 CompletableFuture 链永远不会由库的内部线程池执行
- 我的异步方法的所有计算都由内部线程池执行,而不是由用户线程(即方法调用者的线程(执行
所以假设库有以下阻塞方法
Data request(Address address) {
Message request = encode(address);
Message response = sendAndReceive(request);
Data responseData = decode(response);
return responseData;
}
和相应的异步方法
CompletableFuture<Data> requestAsync(Address address) {
return CompletableFuture.supplyAsync(() -> encode(address), internalThreadPool)
.thenCompose(request -> sendAndReceiveAsync(request))
.thenApply(response -> decode(response));
}
第一个要求是通过添加链接.whenCompleteAsync((v,t) -> {})
来满足的,如本答案中所述。
但是,要满足第二个要求,需要做些什么呢?
Sergey Kuksenko 在这里讨论了第二个需求的解决方案,并在 Java 11 的 HttpClient 实现中实现了。
不满足此要求,因为不能保证decode(response)
由内部线程执行。如果编码和sendAndReceiveAsync
快速完成,decode
实际上可以由调用方的线程执行。
这个问题可以通过引入一个启动CF链的CompletableFuture
startCf
来解决。
因此,完整的解决方案可能如下所示
CompletableFuture<Data> requestAsyncFixedAll(Address address) {
CompletableFuture<Void> startCf = new CompletableFuture<>();
CompletableFuture<Data> dataCf = startCf.thenApplyAsync(v -> encode(address), internalThreadPool)
.thenCompose(request -> sendAndReceiveAsync(request))
.thenApply(response -> decode(response)).whenCompleteAsync((v, t) -> {});
startCf.complete(null);
return dataCf;
}