如何立即将任务从一个线程池传送到另一个线程池?



我有一个输入元素列表,我想将其队列到多个线程池中。假设这是我的输入:

final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());

这些是我希望元素一个接一个地运行的三个函数:

final Function<Integer, Integer> step1 =
value -> { // input from the ints list
return value * 2;
};
final Function<Integer, Double> step2 =
value -> { // input from the previous step1
return (double) (value * 2); //
};
final Function<Double, String> step3 =
value -> { // input from the previous step2
return "Result: " + value * 2;
};

这些将是每个步骤的池:

final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);

我希望每个元素都贯穿step1Pool并应用step1.一旦一个元素完成,它的结果应该 最终以step2pool结束,以便step2可以在此处应用。一旦step2Pool某事完成,它应该是 排队step3Pool,应应用step3。 在我的主线程上,我想等到我得到step3的所有结果。处理每个元素的顺序 无所谓。只是它们都在正确的线程池上运行step1->step2->step3

基本上我想并行化Stream.map,将每个结果立即推送到下一个队列,然后等到我 从我上一个线程池中得到了ints.size()结果。

有没有一种简单的方法可以在 Java 中实现?

我相信CompletableFuture会在这里帮助你!

List<CompletableFuture<String>> futures = ints.stream()
.map(i -> CompletableFuture.supplyAsync(() -> step1.apply(i), step1Pool)
.thenApplyAsync(step2, step2Pool)
.thenApplyAsync(step3, step3Pool))
.collect(Collectors.toList());
List<String> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

最好使用流:

List<String> stringList = Stream.iterate(1, i -> i + 1)
.limit(100)
.parallel()
.map(step1)
.map(step2)
.map(step3)
.collect(Collectors.toList());

相关内容

  • 没有找到相关文章

最新更新