我在玩弄Java8的流和CompletableFuture
。我已有的代码有一个类,它接受一个URL并下载它:
public class FileDownloader implements Runnable {
private URL target;
public FileDownloader(String target) {
this.target = new URL(target);
}
public void run() { /* do it */ }
}
现在,这个类从发出List<String>
(单个主机上的多个目标)的另一部分获取信息。
我已经将周围的代码切换为CompletableFuture
:
public class Downloader {
public static void main(String[] args) {
List<String> hosts = fetchTargetHosts();
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync((files) -> {
for (String target : files) {
new FileDownloader(target).run();
}
});
}
}
public static class HostDownloader implements Supplier<List<String>> {
/* not shown */
}
/* My implementation should either be Runnable or Consumer.
Please suggest based on a idiomatic approach to the main loop.
*/
public static class FileDownloader implements Runnable, Consumer<String> {
private String target;
public FileDownloader(String target) {
this.target = target;
}
@Override
public void run() { accept(this.target); }
@Override
public void accept(String target) {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
}
}
}
现在,这感觉不自然。我产生了一串String
,我的FileDownloader
每次消耗一个。是否有现成的使我的单值Consumer
与List
s一起工作,或者我在这里坚持使用for
循环?
我知道将循环移动到accept
中只是创建一个Consumer<List<String>>
是微不足道的,这不是重点。
将两个直接依赖的步骤分解为两个异步步骤是没有意义的。他们仍然相互依赖,如果分离有任何影响,那也不会是积极的。
你可以直接使用
List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().forEach(fileDownloader));
或者,假设FileDownloader
没有关于下载的可变状态:
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().parallelStream().forEach(fileDownloader));
这仍然具有与使用supplyAsync
+ thenAcceptAsync
的原始方法相同的并发级别,仅仅是因为这两个依赖步骤无论如何都不能并发运行,因此简单的解决方案是将这两个步骤放入一个简洁的操作中,将异步执行。
然而,在这一点上值得注意的是,不建议在此操作中完全使用
CompletableFuture
。正如它的文档所述:
- 所有没有显式Executor参数的async方法都使用
执行ForkJoinPool.commonPool()
公共池的问题在于,其预配置的并发级别取决于CPU内核的数量,如果线程在I/O操作期间被阻塞,则不会进行调整。换句话说,它不适合I/O操作。
与Stream
不同,CompletableFuture
允许您为异步操作指定Executor
,因此您可以配置自己的Executor
以适合I/O操作,另一方面,当您处理Executor
时,根本不需要CompletableFuture
,至少不需要这样简单的任务:
List<String> hosts = fetchTargetHosts();
int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
for(String target: new HostDownloader(host).get())
connEs.execute(()->fileDownloader.accept(target));
});
在这里,您可以考虑将FileDownloader.accept
的代码内联到lambda表达式中,或者将其还原为Runnable
,以便您可以将内部循环的语句更改为connEs.execute(new FileDownloader(target))
。
可以是:
CompletableFuture.supplyAsync(worker)
.thenApply(list -> list.stream().map(FileDownloader::new))
.thenAccept(s -> s.forEach(FileDownloader::run));
我认为你需要这样做forEach:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.stream()
.forEach(target -> new FileDownloader(target).run())
);
}
顺便说一下,你可以对主循环做同样的事情…
编辑:由于OP编辑了原始帖子,添加了FileDownloader的实现细节,我正在相应地编辑我的答案。Java 8函数式接口允许使用lambda expr来代替具体的Class。因此,"利用"Java 8消费者意味着用accept的代码替换FileDownloader,就像这样:
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync(files ->
files.forEach(target -> {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
})
);
}