Akka ActorSystem 处理同一事件两次



我们有一个Scala Akka ActorSystem设置,每天处理数百万个事件,从历史上看,我们很少发现1或2个事件被处理了两次,但最近重复事件在某些日子增加到100个左右。

我们的设置简化如下:

// EventJob runs once every 10 seconds
class EventJob extends Actor {
val EventListnerPoolOfActors = ActorSystem().actorOf(
RoundRobinPool(10)
.props(Props(classOf[EventHandler])),
"InjectorActorID"
)
override def preStart(): Unit = {
self ! ReceivedJobStart()
}
def receive: Actor.Receive = {
case ReceivedJobStart() =>
doWork()
context.system.scheduler.scheduleOnce(10, self, ReceivedJobStart())
}
def doWork(): Future[Unit] = {
// returns Future[Seq[Event]]
getUnprocessEvents().map { x =>
{
// pass each Event to an EventHandler Actor to process
for (a <- 0 to x.size) {
EventListnerPoolOfActors ! x(a)
}
}
}
}
}
class EventHandler extends Actor {
def receive = {
...
}
}

每个事件都有一个唯一的ID,在我们的日志中,它显示一些事件在几毫秒内被处理了两次(转到EventHandler.receive(。所有演员都是本地人。

AFAIK 默认的消息传递可靠性最多为一次,不断增加的消息数量似乎被多次传递的原因是什么,以及如何减少这个问题?

我们的系统设置为处理重复项,我们只是不知道为什么最近它似乎在增加,并希望减少它。

假设您的系统不会生成具有相同 ID 的重复工作单元,系统有时会多次处理消息的一个可能原因是事件分发给EventHandler参与者的方式; 这与 Akka 的消息传递保证无关。

考虑您的getUnprocessEvents()方法。它返回一个Future[Seq[Event]]并在执行组件的常规消息处理之外运行,并且在再次调用消息之前mSeq中删除消息getUnprocessEvents()没有保险。消息被推送给工人,而不考虑他们是否可以从事更多工作。在随后调用getUnprocessEvents()时,工作人员可能仍在处理m的消息,在这种情况下,m将再次发送到工作人员的邮箱。使用调度程序定期调用此方法(即,指定一个时间窗口以使工作线程参与者有足够的时间来处理其消息(是一种有缺陷的协调工作的方法。

更好的方法是使工作队列成为工作协调员参与者状态的一部分(即,使队列成为参与者中的内部变量,并通过参与者消息传递改变队列(并使用工作拉取模式。另外,请考虑使用 Akka Streams。

作为旁注,EventJob创建了一个新ActorSystem

val EventListnerPoolOfActors = ActorSystem().actorOf(...)

每个应用程序只能有一个ActorSystem。请改用context

val EventListnerPoolOfActors = context.actorOf(...)

最新更新