如何将Scala ARM与Futures结合使用



我想实现ARM(自动资源管理)模式,其中资源是异步使用的。

问题

假设我的资源看起来像:

class MyResource { 
def foo() : Future[MyResource] = ???
// Other methods returning various futures
def close() : Unit = ???
}
object MyResource { 
def open(name: String): Future[MyResource] = ???
} 

所需的使用模式是:

val r : Future[MyResource] = MyResource.open("name")
r flatMap (r => {
r.foo() /* map ... */ andThen {
case _ => r.close()
}
})

消隐映射函数可能很复杂,涉及分支和链接期货,这些期货会重复调用返回期货的r方法。

我希望确保在所有未来的延续完成(或失败)后调用r.close()。在每个呼叫站点手动执行此操作很容易出错。这就需要一个ARM解决方案。

尝试的解决方案

scala-arm库通常是同步的。这段代码不会做正确的事情,因为close()会在块内的期货完成之前被调用:

for (r <- managed(MyResource.open("name"))) {
r map (_.foo()) // map ...
}

我想使用这个包装器:

def usingAsync[T](opener: => Future[MyResource]) (body: MyResource => Future[T]) : Future[T] =
opener flatMap { 
myr => body(myr) andThen { case _ => myr.close() } }

然后呼叫站点将看起来像:

usingAsync(MyResource.open("name")) ( myr => {
myr.foo // map ...
})

但是,块内的代码将负责返回一个Future,该Future在该块创建的所有其他Future完成时完成。如果它不小心没有,那么资源将在所有使用它的期货完成之前再次关闭。并且不会有静态验证来捕捉这个错误。例如,这将是一个运行时错误:

usingAsync(MyResource.open("name")) ( myr => {
myr.foo() // Do one thing
myr.bar() // Do another
})

如何解决这个问题

显然,我可以使用scala-arm的定界连续支持(CPS)。它看起来有点复杂,我害怕出错。而且它需要启用一个编译器插件。此外,我的团队对scala非常陌生,我不想要求他们使用CPS。

CPS是唯一的出路吗?有没有一个库或设计模式可以通过Futures更简单地做到这一点,或者用scala-arm做这一点的例子?

反应式扩展(Rx)可能是一种替代解决方案。这种编程模式的发展势头越来越大,现在可以在包括Scala在内的许多语言中使用。

Rx的基础是创建一个Observable,它是异步事件的来源。可观察性可以以复杂的方式被锁住,这就是它的力量。您订阅了Observable来侦听onNext、onError和onComplete事件。您还将收到允许您取消订阅的订阅回复。

我认为您可能会在中添加resource.close()调用和onCompleted和/或onError处理程序。

参见RxScala文档了解:

Observable.subscribe(
onNext: (T) ⇒ Unit, 
onError: (Throwable) ⇒ Unit, 
onCompleted: () ⇒ Unit): Subscription

更多信息:

  • RxScala站点:http://rxscala.github.io/
  • RxScala可观测:http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable
  • Ben Christensen在NetFlix的精彩介绍:http://www.infoq.com/presentations/netflix-functional-rx
  • Erik Meijer在Martin Odersky、Erik Mejer和Roland Kuhn的Coursera课程《反应式编程原理》中给出了链式Observable的代码示例

相关内容

  • 没有找到相关文章

最新更新