不会每次都处理 AKKA 持久函数



我在集群中使用带有分片的 akka 的 PersistentActor 来跟踪我的状态。我有一个"房间",我可以通过以下代码更新:

case UpdateRoom(id, room, userId) => ((ret: ActorRef) => {
  (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map { role =>
    if (role == "owner") {
      state = Some(room)
      ret ! Success(room)
      eventRegion ! RoomEventPackage(id, RoomUpdated(room))
      println("works")
      persist(RoomUpdated(room))(e => println("this doesn't always fire")
    } else {
      ret ! Failure(InsufficientRights(role, "Update Room"))
    }
  }

问题是持久化每隔一段时间才起作用,而函数的其余部分按预期工作。("作品"每次都印出来,"这并不总是火"每隔一段时间,但会印两次)。我总是必须触发两次更新命令来存储我的事件,但随后它似乎被存储了两次我触发命令的两次。

我是否错过了 akka 持续存在的重要部分?

我认为您在Actor世界中犯了一个严重的错误:从外部访问参与者(可变)状态。在您的情况下,这在 ask/? 返回的Future的回调中发生两次:

  • 更新时状态:state = Some(room)
  • 拨打persist

处理从Actor内部询问并随后修改参与者状态的唯一安全方法是从ask的回调向同一参与者发送消息,为此,您可以使用pipeTo

使用简化版本的代码来说明:

case UpdateRoom(id, room, userId) => 
  val answer = (userRegion ? GetRoleForRoom(userId, id)).mapTo[String] map(role => RoleForRoom(id, room, userId, role))
  answer piepTo self
case RoleForRoom(id, room, userId, room) => 
  if (role == "owner") {
    state = Some(room)
    eventRegion ! RoomEventPackage(id, RoomUpdated(room))
    persist(RoomUpdated(room))(e => println("this is safe"))
  }

另请参阅:https://doc.akka.io/docs/akka/2.5.6/scala/general/jmm.html#actors-and-shared-mutable-state

最新更新