异步填充 Java 映射并将其作为未来返回



我有一个对象的映射,创建这些对象的成本很高,因此我想创建对象并与应用程序中的其他进程并行填充映射。仅当主线程实际需要访问映射时,应用程序才应等待填充映射的异步任务完成。我怎样才能最优雅地完成这项工作?

当前的做法

目前,我能够使用类似于以下示例代码中的CompletableFuture.runAsync(Runnable, Executor)异步创建映射本身中的每个对象,但我不确定如何构建一个 Future/CompletableFuture 类型的机制,以便在准备就绪时返回Map本身:

public static class AsynchronousMapPopulator {
    private final Executor backgroundJobExecutor;
    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }
    public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // TODO: This blocks until the training is done; Instead, return a
        // future to the caller somehow
        CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
        return result;
    }
}

但是,对于上面的代码,当代码调用 AsynchronousTest.create(Map<String,Integer) 时,它已经阻塞,直到该方法返回完全填充的ConcurrentMap<String,Integer>;我怎样才能把它变成一个类似Future<Map<String,Integer>>的东西,以便我以后可以使用它?

Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();

正如@Holger在他的评论中所说,你必须避免调用.join(),而是依赖thenApply(),例如:

public static class AsynchronousMapPopulator {
    private final Executor backgroundJobExecutor;
    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }
    public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // using thenApply instead of join here:
        return CompletableFuture.allOf(
                incrementingJobs.build().toArray(
                    CompletableFuture[]::new
                )
            ).thenApply(x -> result);
    }
}

最新更新