用返回Future的函数映射Stream



我有时会发现自己有一些Stream[X]function X => Future Y,我想把它们组合成Future[Stream[Y]],但似乎找不到方法。例如,我有

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)
val result : Future[Stream[String]] = ???

我试过

 val result = Future.Traverse(x, toFutureString)

它给出了正确的结果,但似乎在返回"未来"之前消耗了整个流,"未来"或多或少地击败了purpse

我试过

val result = x.flatMap(toFutureString)

但它不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?] 编译

val result = x.map(toFutureString)

返回有点奇怪和无用的Stream[Future[String]]

我该怎么做才能把事情修好?

编辑:我不拘泥于Stream,我对Iterator上的相同操作同样满意,只要它不会在开始处理头部之前阻止评估所有项目

Edit2:我不能100%确定Future.Traverse构造在返回Future[stream]之前需要遍历整个流,但我认为它确实需要。如果没有,这本身就是一个很好的答案。

Edit3:我也不需要结果是有序的,我对返回的流或迭代器是任何顺序都很满意。

使用traverse是正确的,但不幸的是,在这种情况下,标准库的定义似乎有点崩溃——它不需要在返回之前消耗流。

Future.traverse是一个更通用的函数的特定版本,它适用于任何封装在"可遍历"类型中的应用函子(例如,请参阅这些论文或我在这里的回答以获取更多信息)。

Scalaz库提供了这个更通用的版本,在这种情况下它可以正常工作(请注意,我正在从scalaz-contrib获得Future的应用函子实例;它还不在Scalaz的稳定版本中,这些版本仍然是针对Scala2.9.2交叉构建的,Scala2.9.2没有这个Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._
import ExecutionContext.Implicits.global
def toFutureString(value: Int) = Future(value.toString)
val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

这会在无限流中立即返回,所以我们确信它不是首先消耗的。


作为一个脚注:如果你查看Future.traverse的源代码,你会发现它是根据foldLeft实现的,这很方便,但在流的情况下不是必要的或合适的。

忘记流:

import scala.concurrent.Future
import ExecutionContext.Implicits.global
val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
  println("starting " + value)
  Thread.sleep(1000)
  println("completed " + value)
  value.toString
}

收益率(在我的8芯盒子上):

scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2
scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10

因此,它们中的8个会立即启动(每个核心一个,尽管这可以通过线程池执行器进行配置),然后随着它们的完成,更多的会启动。Future[List[String]]会立即返回,然后在暂停后开始打印那些"已完成的x"消息。

例如,当您有一个List[Url’s]和一个Url=>Future[HttpResponseBody]类型的函数时,可以使用它。您可以使用该函数对该列表调用Future.traverse,并并行启动这些http请求,返回一个结果列表。

是不是和你想要的一样?

由于Scalaz traverse()的现代版本表现不同,并试图在调用时消耗整个流,因此接受的答案不再有效。

至于这个问题,我想说,不可能以一种真正的无障碍方式实现这一点。

Stream[Y]可用之前,无法解析Future[Stream[Y]]。由于Y是由函数X => Future[Y]异步生成的,因此在遍历Stream[Y]时,如果不阻塞,就无法获得Y。这意味着,要么在解析Future[Stream[Y]]之前必须解析所有Future[Y](这需要消耗整个流),要么必须允许在遍历Stream[Y]时发生块(对于基础期货尚未完成的项目)。但是,如果我们允许对遍历进行阻塞,那么最终的未来的完成定义是什么?从这个角度来看,它可能与Future.successful(BlockingStream[Y])相同。这反过来在语义上等于原始Stream[Future[Y]]

换句话说,我认为这个问题本身就有问题。

相关内容

  • 没有找到相关文章

最新更新