Akka 2:如何暂停消息处理



在我使用Akka掌握Actor模型的过程中,会弹出许多问题。这是另一个。假设我们有一个Actor,由于某些业务逻辑或可用资源,它必须在给定时间内停止处理消息。可能发生这种情况的情况有:

  • 节流。可能有一个Actor发送电子邮件,但被限制为每秒只能发送一封电子邮件。

  • Actor可能会使用一些只能同时处理x消息的系统。这可能是一个AsyncHttpClient,它有一个固定的线程池,我不想重载它

  • 某些外部资源不可用,需要处理消息(读取:外部REST-API)

很可能我的大脑还没有为演员做好准备,我只需要一个提示,如何以演员的方式解决这些问题。

一般答案

参与者总是以最快的速度处理消息,处理意味着将消息从邮箱中取出并传递给参与者的行为。因此,行为就是你的答案所在:在需要非名义行动的时间段内,将其改变为更合适的行为。

节流

如果一个组件处理消息的速率低于产生消息的速率,那么它最终将不得不丢弃消息。要么使用一个有限制的邮箱,要么让一个经理站在最前面,跟踪员工的进度,并在压力时期进入"回复负面结果"模式。

当参与者想要调节自己的输出速率时,请使用context.system.scheduler

这应该回答你的前两点。

恢复

在所需资源不可用的时期,根据需求,您有两种选择:要么在内部对消息进行排队,要么进入"无序"回复模式。您也可以混合,即在有特定时间和空间限制的情况下排队,并在达到限制时失败。

进一步考虑

始终保持参与者处理的工作单元非常小,以便参与者能够在其延迟要求内做出反应。后者可以非常放松(连续运行数小时),也可以非常严格(必须以kHz速率处理消息)。

case object NextEmail
class EmailActor extends Actor {
self ! NextEmail
  def receive = {
    case NextEmail =>
      sendEmailIfAnyToSend
      context.system.scheduler.scheduleOnce(3 seconds, self, NextEmail)                 
  }
}

最新更新