我正在使用akka数据流,我想知道是否有一种方法可以使特定的代码块等待未来的完成,而无需显式使用该未来的值。
实际的用例是,我有一个文件,我希望在特定的未来完成时删除该文件,而不是在此之前。这里有一个粗略的例子。首先假设我有这样一个服务:
trait ASync {
def pull: Future[File]
def process(input : File): Future[File]
def push(input : File): Future[URI]
}
我有一个工作流,我想以非阻塞的方式运行:
val uriFuture = flow {
val pulledFile = async.pull(uri)
val processedile = async.process(pulledFile())
val storedUri = async.push(processedFile())
// I'd like the following line executed only after storedUri is completed,
// not as soon as pulled file is ready.
pulledFile().delete()
storedUri()
}
你可以尝试这样做:
val uriFuture = flow {
val pulledFile = async.pull(uri)
val processedile = async.process(pulledFile())
val storedUri = for(uri <- async.push(processedFile())) yield {
pulledFile().delete()
uri
}
storedUri()
}
在本例中,只有当push
中的Future
成功时才会调用pulledFile.delete
。如果失败,则不会调用delete
。storedUri
future的结果仍然是调用push
的结果。
或者另一种方式是:
val uriFuture = flow {
val pulledFile = async.pull(uri)
val processedile = async.process(pulledFile())
val storedUri = async.push(processedFile()) andThen{
case whatever => pulledFile().delete()
}
storedUri()
}
这里的区别在于,无论push
成功还是失败,都将调用delete
。storedUri
的结果仍然是调用push
的结果。
您可以在非阻塞工作流中使用回调:
future onSuccess {
case _ => file.delete() //Deal with cases obviously...
}
来源:http://doc.akka.io/docs/akka/snapshot/scala/futures.html
或者,您可以使用wait.result:
阻塞val result = Await.result(future, timeout.duration).asInstanceOf[String]
后者通常在需要阻塞时使用-例如在测试用例中-而非阻塞性能更高,因为您不需要停放一个线程来启动另一个线程,然后再次恢复另一个线程-由于资源管理开销,这比异步活动慢。
类型安全人员称它为"Reactive"。这是一个时髦的词。如果你在工作场合用这个词,我会笑你的。