我有时会发现自己有一些Stream[X]
和function X => Future Y
,我想把它们组合成Future[Stream[Y]]
,但似乎找不到方法。例如,我有
val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)
val result : Future[Stream[String]] = ???
我试过
val result = Future.Traverse(x, toFutureString)
它给出了正确的结果,但似乎在返回"未来"之前消耗了整个流,"未来"或多或少地击败了purpse
我试过
val result = x.flatMap(toFutureString)
但它不能用type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]
编译
val result = x.map(toFutureString)
返回有点奇怪和无用的Stream[Future[String]]
我该怎么做才能把事情修好?
编辑:我不拘泥于Stream
,我对Iterator
上的相同操作同样满意,只要它不会在开始处理头部之前阻止评估所有项目
Edit2:我不能100%确定Future.Traverse构造在返回Future[stream]之前需要遍历整个流,但我认为它确实需要。如果没有,这本身就是一个很好的答案。
Edit3:我也不需要结果是有序的,我对返回的流或迭代器是任何顺序都很满意。
使用traverse
是正确的,但不幸的是,在这种情况下,标准库的定义似乎有点崩溃——它不需要在返回之前消耗流。
Future.traverse
是一个更通用的函数的特定版本,它适用于任何封装在"可遍历"类型中的应用函子(例如,请参阅这些论文或我在这里的回答以获取更多信息)。
Scalaz库提供了这个更通用的版本,在这种情况下它可以正常工作(请注意,我正在从scalaz-contrib
获得Future
的应用函子实例;它还不在Scalaz的稳定版本中,这些版本仍然是针对Scala2.9.2交叉构建的,Scala2.9.2没有这个Future
):
import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._
import ExecutionContext.Implicits.global
def toFutureString(value: Int) = Future(value.toString)
val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString
这会在无限流中立即返回,所以我们确信它不是首先消耗的。
作为一个脚注:如果你查看Future.traverse
的源代码,你会发现它是根据foldLeft
实现的,这很方便,但在流的情况下不是必要的或合适的。
忘记流:
import scala.concurrent.Future
import ExecutionContext.Implicits.global
val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
println("starting " + value)
Thread.sleep(1000)
println("completed " + value)
value.toString
}
收益率(在我的8芯盒子上):
scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = scala.concurrent.impl.Promise$DefaultPromise@2d9472e2
scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10
因此,它们中的8个会立即启动(每个核心一个,尽管这可以通过线程池执行器进行配置),然后随着它们的完成,更多的会启动。Future[List[String]]会立即返回,然后在暂停后开始打印那些"已完成的x"消息。
例如,当您有一个List[Url’s]和一个Url=>Future[HttpResponseBody]类型的函数时,可以使用它。您可以使用该函数对该列表调用Future.traverse,并并行启动这些http请求,返回一个结果列表。
是不是和你想要的一样?
由于Scalaz traverse()
的现代版本表现不同,并试图在调用时消耗整个流,因此接受的答案不再有效。
至于这个问题,我想说,不可能以一种真正的无障碍方式实现这一点。
在Stream[Y]
可用之前,无法解析Future[Stream[Y]]
。由于Y
是由函数X => Future[Y]
异步生成的,因此在遍历Stream[Y]
时,如果不阻塞,就无法获得Y
。这意味着,要么在解析Future[Stream[Y]]
之前必须解析所有Future[Y]
(这需要消耗整个流),要么必须允许在遍历Stream[Y]
时发生块(对于基础期货尚未完成的项目)。但是,如果我们允许对遍历进行阻塞,那么最终的未来的完成定义是什么?从这个角度来看,它可能与Future.successful(BlockingStream[Y])
相同。这反过来在语义上等于原始Stream[Future[Y]]
。
换句话说,我认为这个问题本身就有问题。