如何调用一个接一个返回Future的回调列表



我需要一种机制来异步调用一些回调…所以我实现了下面的类:

class AsyncCallbacks[T] {
  private val callbacks = new ListBuffer[T => Future[Unit]]()
  def +=(f: T => Future[Unit]) = callbacks += f
  def -=(f: T => Future[Unit]) = callbacks -= f
  def invoke(data: T) = Future.sequence(callbacks.map(_(data)))
}
...
def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]
callbacks += f1
callbacks += f2
callbacks.invoke(5)

callbacks.invoke产生scala.concurrent.Future[scala.collection.mutable.ListBuffer[Unit]]…我想知道是否有一种更好,更有效的方法来调用所有已注册的回调,而不会生成无用的Unit s列表。

上面的实现还有另一个问题…让我们假设我们有以下方法…

def l1 = Future { List.fill(5)("1") }
def l2 = Future { List.fill(5)("2") }

…然后像这样调用它们:

for {
  a <- l1
  b <- l2
  c <- callbacks.invoke(5)
} yield b

callbacks.invoke works…但是它似乎永远不会返回…

编辑

好的,我已经尝试重新实现我的AsyncCallbacks类使用scalaz由i.k.建议:

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.concurrent.Task
class AsyncCallbacks[T] {
  private val tasks = new ListBuffer[Task[T => Future[Unit]]]()
  /** Gets the number of callbacks registered. */
  def count = tasks.length
  /** Clears all the registered callbacks. */
  def clear = tasks.clear
  /* Adds the specified function to the list of callbacks to be invoked. */
  def +=(f: T => Future[Unit]) = tasks += Task(f)
  /** Invokes all the registered callbacks. */
  def invoke(data: T) = Future { Task.gatherUnordered(tasks).map(_.map(_(data))).run.length }
}

它的用法如下:

def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]()
callbacks += f1
callbacks += f2
callbacks.invoke(4) // prints 4 two times (f1 + f2)

现在从REPL中执行上面的代码…然后尝试调用' callback .invoke(4)很多很多次,你会看到你不再能够退出REPL(它仍然被阻塞,你必须用CTRL-C退出)。我认为这可能是一个真正的应用程序的问题。

从你的帖子看来,无论你想要放入Future主体的数据类型如何,你都希望它完成并得到通知。

在Scalaz中,它将被建模为Task,本质上是Future的底层,但带有额外的功能。

一些例子,

scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> val tasks = (1 |->  5).map(n => Task { Thread.sleep(100); n })
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@72b64eae, scalaz.concurrent.Task@3f6a6af, scalaz.concurrent.Task@5ba0314c, scalaz.concurrent.Task@36718c9f, scalaz.concurrent.Task@767277c1)
scala> Task.gatherUnordered(tasks).run
res10: List[Int] = List(4, 1, 2, 3, 5)
scala> Task.gatherUnordered(tasks).run
res11: List[Int] = List(3, 1, 2, 4, 5)
scala> Task.gatherUnordered(tasks).run
res12: List[Int] = List(2, 1, 3, 4, 5)

可以看到,每次运行这些任务时,输出都是不同的。Task的实现是不确定的。

以你为例,

scala> val tasks = List(Task{1},Task{2})
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@2858b10a, scalaz.concurrent.Task@3782f5d8)
scala> Task.gatherUnordered(tasks).run
res13: List[Int] = List(1, 2)
scala> val tasks = List(Task{List.fill(5)("1")}, Task{List.fill(5)("2")})
tasks: List[scalaz.concurrent.Task[List[String]]] = List(scalaz.concurrent.Task@1c8dd945, scalaz.concurrent.Task@71f8e5ff)
scala> Task.gatherUnordered(tasks).run
res17: List[List[String]] = List(List(1, 1, 1, 1, 1), List(2, 2, 2, 2, 2))

最新更新