>假设我有一个相当标准的生产者/消费者问题需要在Scala中编码,具有这种结构:
- 构造延迟生成元素的
Stream
或Iterator
。 - 在
Stream
或Iterator
上使用map
或foreach
来处理这些元素并对其进行处理。
这似乎工作得很好,但它看起来像是单线程的:当我们想要处理一个新元素时,我们要求生成它,在生成它之后,我们开始处理它。我真正想要的是一种机制,以便在处理前一个元素时继续生成。有没有办法让Scala做到这一点?
我知道我可以使用BlockingQueue
,但这对我来说似乎非常必要。我希望有一种方法可以让Stream
继续在另一个线程上生成元素。
当然,一旦我们提前生成它们,它就不再是懒惰的评估了。但是,我也不想提前生成整个流的急切评估。我想要BlockingQueue
的类似物,但在功能范式中。
您可以将流中的项目映射到未来的处理,如下所示:
def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})
现在这不会产生任何内容,因为 Stream 不会生成项目,直到您尝试实现它们,就像您建议流工作一样。因此,如果您想触发接下来 10 个项目的处理,您可以像这样简单地实现它们:
val futureResults = asyncProducer.take(10).toList
这将启动 10 个并行进程(取决于您在范围内的 ExecutionContext(并生成一个List[Future[Int]]
。为了能够接收所有这些工作项,您可以排序到未来列表到未来的列表:
val futureResult = Future.sequence(futureResults)
现在,您可以映射以获取结果列表,并将其交给某个收件人并开始下一个处理块。