我有无限流,消息表示为我Iteratee
应用的播放Enumerator
。然后,每条消息由 Akka 参与者处理(参与者数量限制为 10 个)。
现在,我希望Iteratee
中的代码在所有 10 个参与者都忙于时异步等待空闲演员,并且不向他们发送导致异常Ask timed out on ...
的另一条消息。
如何实现此类功能?有没有更好的方法在没有await
的情况下处理 10 个演员的无限流?
我正在谈论的代码示例可能如下所示:
val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(10)))
val it = Iteratee.foreach[Msg] { msg =>
workers ? msg
}
msgEnumerator.apply(it)
将Iteratee.foldM
与您在此处的参与者询问模式一起使用似乎是正确的方法。假设你不希望你的演员建立大邮箱(如果你不关心大邮箱,只需使用 tell
和 Iteratee.foreach
而不是ask
)这将需要一些专门的路由逻辑。由于用于创建自定义 akka 路由器的 api 不支持异步,因此您需要一个自定义 actor 来处理一次仅向 actor 池中的每个参与者分发一件工作的逻辑。
我想它的工作原理是这样的:
class WorkDistributor extends Actor {
final val NUM_WORKERS = 10
val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(NUM_WORKERS)))
var numActiveWorkers = 0
var queuedWork: Option[Work] = None
def receive = {
case IterateeWork(work) if numActiveWorkers < NUM_WORKERS => workers ! work; numActiveWorkers += 1; sender ! SendMeMoreWork
case IterateeWork(work) => queuedWork = Some(work)
case ActorFinishedWork if queuedWork.isDefined => queuedWork.foreach(workers ! _); queuedWork = None
case ActorFinishedWork => numActiveWorkers -= 1; sender ! SendMeMoreWork
}
}
其中,IterateeWork
消息由迭代者发送,ActorFinishedWork
消息由执行组件池中的参与者发送。
看看我写的这个东西,这应该重写为使用become
来更改演员池已满时的行为(而不是每种情况下的if
过滤器,但我将其留给读者练习。
那么你的Iteratee
会看起来像
Iteratee.foldM[Work, SendMeMoreWork.type](SendMeMoreWork) {
case (_, work) => workDistributor ? IterateeWork(work)
}