如何在使用播放迭代处理数据流时等待免费的 Akka actor



我有无限流,消息表示为我Iteratee应用的播放Enumerator。然后,每条消息由 Akka 参与者处理(参与者数量限制为 10 个)。

现在,我希望Iteratee中的代码在所有 10 个参与者都忙于时异步等待空闲演员,并且不向他们发送导致异常Ask timed out on ...的另一条消息。

如何实现此类功能?有没有更好的方法在没有await的情况下处理 10 个演员的无限流?

我正在谈论的代码示例可能如下所示:

val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(10)))
val it = Iteratee.foreach[Msg] { msg => 
  workers ? msg
}
msgEnumerator.apply(it)

Iteratee.foldM与您在此处的参与者询问模式一起使用似乎是正确的方法。假设你不希望你的演员建立大邮箱(如果你不关心大邮箱,只需使用 tellIteratee.foreach 而不是ask)这将需要一些专门的路由逻辑。由于用于创建自定义 akka 路由器的 api 不支持异步,因此您需要一个自定义 actor 来处理一次仅向 actor 池中的每个参与者分发一件工作的逻辑。

我想它的工作原理是这样的:

class WorkDistributor extends Actor {
  final val NUM_WORKERS = 10
  val workers = context.actorOf(Props[MyWorker].withRouter(RoundRobinRouter(NUM_WORKERS))) 
  var numActiveWorkers = 0
  var queuedWork: Option[Work] = None
  def receive = {
    case IterateeWork(work) if numActiveWorkers < NUM_WORKERS => workers ! work; numActiveWorkers += 1; sender ! SendMeMoreWork
    case IterateeWork(work) => queuedWork = Some(work)
    case ActorFinishedWork if queuedWork.isDefined => queuedWork.foreach(workers ! _); queuedWork = None
    case ActorFinishedWork => numActiveWorkers -= 1; sender ! SendMeMoreWork
  }
}

其中,IterateeWork消息由迭代者发送,ActorFinishedWork消息由执行组件池中的参与者发送。

看看我写的这个东西,这应该重写为使用become来更改演员池已满时的行为(而不是每种情况下的if过滤器,但我将其留给读者练习。

那么你的Iteratee会看起来像

Iteratee.foldM[Work, SendMeMoreWork.type](SendMeMoreWork) {
  case (_, work) => workDistributor ? IterateeWork(work)
}

最新更新