我需要一种机制来异步调用一些回调…所以我实现了下面的类:
class AsyncCallbacks[T] {
private val callbacks = new ListBuffer[T => Future[Unit]]()
def +=(f: T => Future[Unit]) = callbacks += f
def -=(f: T => Future[Unit]) = callbacks -= f
def invoke(data: T) = Future.sequence(callbacks.map(_(data)))
}
...
def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]
callbacks += f1
callbacks += f2
callbacks.invoke(5)
callbacks.invoke
产生scala.concurrent.Future[scala.collection.mutable.ListBuffer[Unit]]
…我想知道是否有一种更好,更有效的方法来调用所有已注册的回调,而不会生成无用的Unit
s列表。
上面的实现还有另一个问题…让我们假设我们有以下方法…
def l1 = Future { List.fill(5)("1") }
def l2 = Future { List.fill(5)("2") }
…然后像这样调用它们:
for {
a <- l1
b <- l2
c <- callbacks.invoke(5)
} yield b
callbacks.invoke
works…但是它似乎永远不会返回…
编辑
好的,我已经尝试重新实现我的AsyncCallbacks
类使用scalaz由i.k.建议:
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz.concurrent.Task
class AsyncCallbacks[T] {
private val tasks = new ListBuffer[Task[T => Future[Unit]]]()
/** Gets the number of callbacks registered. */
def count = tasks.length
/** Clears all the registered callbacks. */
def clear = tasks.clear
/* Adds the specified function to the list of callbacks to be invoked. */
def +=(f: T => Future[Unit]) = tasks += Task(f)
/** Invokes all the registered callbacks. */
def invoke(data: T) = Future { Task.gatherUnordered(tasks).map(_.map(_(data))).run.length }
}
它的用法如下:
def f1(i: Int) = Future { println(i) }
def f2(i: Int) = Future { println(i) }
val callbacks = new AsyncCallbacks[Int]()
callbacks += f1
callbacks += f2
callbacks.invoke(4) // prints 4 two times (f1 + f2)
现在从REPL中执行上面的代码…然后尝试调用' callback .invoke(4)很多很多次,你会看到你不再能够退出REPL(它仍然被阻塞,你必须用CTRL-C退出)。我认为这可能是一个真正的应用程序的问题。
从你的帖子看来,无论你想要放入Future
主体的数据类型如何,你都希望它完成并得到通知。
在Scalaz中,它将被建模为Task
,本质上是Future
的底层,但带有额外的功能。
一些例子,
scala> import scalaz.concurrent.Task
import scalaz.concurrent.Task
scala> val tasks = (1 |-> 5).map(n => Task { Thread.sleep(100); n })
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@72b64eae, scalaz.concurrent.Task@3f6a6af, scalaz.concurrent.Task@5ba0314c, scalaz.concurrent.Task@36718c9f, scalaz.concurrent.Task@767277c1)
scala> Task.gatherUnordered(tasks).run
res10: List[Int] = List(4, 1, 2, 3, 5)
scala> Task.gatherUnordered(tasks).run
res11: List[Int] = List(3, 1, 2, 4, 5)
scala> Task.gatherUnordered(tasks).run
res12: List[Int] = List(2, 1, 3, 4, 5)
可以看到,每次运行这些任务时,输出都是不同的。Task
的实现是不确定的。
以你为例,
scala> val tasks = List(Task{1},Task{2})
tasks: List[scalaz.concurrent.Task[Int]] = List(scalaz.concurrent.Task@2858b10a, scalaz.concurrent.Task@3782f5d8)
scala> Task.gatherUnordered(tasks).run
res13: List[Int] = List(1, 2)
scala> val tasks = List(Task{List.fill(5)("1")}, Task{List.fill(5)("2")})
tasks: List[scalaz.concurrent.Task[List[String]]] = List(scalaz.concurrent.Task@1c8dd945, scalaz.concurrent.Task@71f8e5ff)
scala> Task.gatherUnordered(tasks).run
res17: List[List[String]] = List(List(1, 1, 1, 1, 1), List(2, 2, 2, 2, 2))