Akka数据流和副作用



我正在使用akka数据流,我想知道是否有一种方法可以使特定的代码块等待未来的完成,而无需显式使用该未来的值。

实际的用例是,我有一个文件,我希望在特定的未来完成时删除该文件,而不是在此之前。这里有一个粗略的例子。首先假设我有这样一个服务:

trait ASync {
   def pull: Future[File]
   def process(input : File): Future[File]
   def push(input : File): Future[URI]
}

我有一个工作流,我想以非阻塞的方式运行:

val uriFuture = flow {
    val pulledFile = async.pull(uri)
    val processedile = async.process(pulledFile())
    val storedUri = async.push(processedFile())
    // I'd like the following line executed only after storedUri is completed, 
    // not as soon as pulled file is ready.
    pulledFile().delete()
    storedUri()
}

你可以尝试这样做:

val uriFuture = flow {
  val pulledFile = async.pull(uri)
  val processedile = async.process(pulledFile())
  val storedUri = for(uri <- async.push(processedFile())) yield {
    pulledFile().delete()
    uri
  }
  storedUri()
}

在本例中,只有当push中的Future成功时才会调用pulledFile.delete。如果失败,则不会调用deletestoredUri future的结果仍然是调用push的结果。

或者另一种方式是:

val uriFuture = flow {
  val pulledFile = async.pull(uri)
  val processedile = async.process(pulledFile())
  val storedUri = async.push(processedFile()) andThen{
    case whatever => pulledFile().delete()
  }
  storedUri()
}

这里的区别在于,无论push成功还是失败,都将调用deletestoredUri的结果仍然是调用push的结果。

您可以在非阻塞工作流中使用回调:

future onSuccess {
  case _ => file.delete() //Deal with cases obviously... 
}

来源:http://doc.akka.io/docs/akka/snapshot/scala/futures.html

或者,您可以使用wait.result:

阻塞
val result = Await.result(future, timeout.duration).asInstanceOf[String]

后者通常在需要阻塞时使用-例如在测试用例中-而非阻塞性能更高,因为您不需要停放一个线程来启动另一个线程,然后再次恢复另一个线程-由于资源管理开销,这比异步活动慢。

类型安全人员称它为"Reactive"。这是一个时髦的词。如果你在工作场合用这个词,我会笑你的。

最新更新