按顺序将列表值传递给单值消费者的最佳方式



我在玩弄Java8的流和CompletableFuture。我已有的代码有一个类,它接受一个URL并下载它:

public class FileDownloader implements Runnable {
    private URL target;
    public FileDownloader(String target) {
        this.target = new URL(target);
    }
    public void run() { /* do it */ }
}

现在,这个类从发出List<String>(单个主机上的多个目标)的另一部分获取信息。

我已经将周围的代码切换为CompletableFuture:

public class Downloader {
    public static void main(String[] args) {
        List<String> hosts = fetchTargetHosts();
        for (String host : hosts) {
            HostDownloader worker = new HostDownloader(host);
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(worker);
            future.thenAcceptAsync((files) -> {
                for (String target : files) {
                    new FileDownloader(target).run();
                }
            });
        }
    }
    public static class HostDownloader implements Supplier<List<String>> {
        /* not shown */ 
    }
    /* My implementation should either be Runnable or Consumer.
       Please suggest based on a idiomatic approach to the main loop.
     */
    public static class FileDownloader implements Runnable, Consumer<String> { 
        private String target;
        public FileDownloader(String target) {
            this.target = target;
        }
        @Override
        public void run() { accept(this.target); }
        @Override
        public void accept(String target) {
            try (Writer output = new FileWriter("/tmp/blubb")) {
                output.write(new URL(target).getContent().toString());
            } catch (IOException e) { /* just for demo */ }
        }
    }
}

现在,这感觉不自然。我产生了一串String,我的FileDownloader每次消耗一个。是否有现成的使我的单值ConsumerList s一起工作,或者我在这里坚持使用for循环?

我知道将循环移动到accept中只是创建一个Consumer<List<String>>是微不足道的,这不是重点。

将两个直接依赖的步骤分解为两个异步步骤是没有意义的。他们仍然相互依赖,如果分离有任何影响,那也不会是积极的。

你可以直接使用

List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().forEach(fileDownloader));

或者,假设FileDownloader没有关于下载的可变状态:

for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().parallelStream().forEach(fileDownloader));

这仍然具有与使用supplyAsync + thenAcceptAsync的原始方法相同的并发级别,仅仅是因为这两个依赖步骤无论如何都不能并发运行,因此简单的解决方案是将这两个步骤放入一个简洁的操作中,将异步执行。


然而,在这一点上值得注意的是,不建议在此操作中完全使用CompletableFuture。正如它的文档所述:
  • 所有没有显式Executor参数的async方法都使用ForkJoinPool.commonPool()
  • 执行

公共池的问题在于,其预配置的并发级别取决于CPU内核的数量,如果线程在I/O操作期间被阻塞,则不会进行调整。换句话说,它不适合I/O操作。

Stream不同,CompletableFuture允许您为异步操作指定Executor,因此您可以配置自己的Executor以适合I/O操作,另一方面,当您处理Executor时,根本不需要CompletableFuture,至少不需要这样简单的任务:

List<String> hosts = fetchTargetHosts();
int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
    for(String target: new HostDownloader(host).get())
        connEs.execute(()->fileDownloader.accept(target));
});

在这里,您可以考虑将FileDownloader.accept的代码内联到lambda表达式中,或者将其还原为Runnable,以便您可以将内部循环的语句更改为connEs.execute(new FileDownloader(target))

可以是:

CompletableFuture.supplyAsync(worker)
                 .thenApply(list -> list.stream().map(FileDownloader::new))
                 .thenAccept(s -> s.forEach(FileDownloader::run));

我认为你需要这样做forEach:

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = 
            CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.stream()
            .forEach(target -> new FileDownloader(target).run())
    );
}
顺便说一下,你可以对主循环做同样的事情…

编辑:由于OP编辑了原始帖子,添加了FileDownloader的实现细节,我正在相应地编辑我的答案。Java 8函数式接口允许使用lambda expr来代替具体的Class。因此,"利用"Java 8消费者意味着用accept的代码替换FileDownloader,就像这样:

for (String host : hosts) {
    HostDownloader worker = new HostDownloader(host);
    CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(worker);
    future.thenAcceptAsync(files -> 
            files.forEach(target -> {
                try (Writer output = new FileWriter("/tmp/blubb")) {
                    output.write(new URL(target).getContent().toString());
                } catch (IOException e) { /* just for demo */ }
            })
    );
}

最新更新