用于 Scala 流生产和处理的多线程



>假设我有一个相当标准的生产者/消费者问题需要在Scala中编码,具有这种结构:

  1. 构造延迟生成元素的StreamIterator
  2. StreamIterator上使用mapforeach来处理这些元素并对其进行处理。

这似乎工作得很好,但它看起来像是单线程的:当我们想要处理一个新元素时,我们要求生成它,在生成它之后,我们开始处理它。我真正想要的是一种机制,以便在处理前一个元素时继续生成。有没有办法让Scala做到这一点?

我知道我可以使用BlockingQueue,但这对我来说似乎非常必要。我希望有一种方法可以让Stream继续在另一个线程上生成元素。

当然,一旦我们提前生成它们,它就不再是懒惰的评估了。但是,我也不想提前生成整个流的急切评估。我想要BlockingQueue的类似物,但在功能范式中。

您可以将流中的项目映射到未来的处理,如下所示:

def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})

现在这不会产生任何内容,因为 Stream 不会生成项目,直到您尝试实现它们,就像您建议流工作一样。因此,如果您想触发接下来 10 个项目的处理,您可以像这样简单地实现它们:

val futureResults = asyncProducer.take(10).toList

这将启动 10 个并行进程(取决于您在范围内的 ExecutionContext(并生成一个List[Future[Int]]。为了能够接收所有这些工作项,您可以排序到未来列表到未来的列表:

val futureResult = Future.sequence(futureResults)

现在,您可以映射以获取结果列表,并将其交给某个收件人并开始下一个处理块。

相关内容

  • 没有找到相关文章