在参与者层次结构中以原子方式传递可变对象



我已经为我的Scala/Java应用程序构建了一个适当的actor层次结构,它主要依赖于fire-forget语义。我现在面临着在参与者之间以原子方式传递唯一可变对象的需求。特别是,我有三个演员,A,B和C:

  A
 / 
B   C

B和C都有自己的独特对象映射,彼此不了解。在某个时间点,B 将决定它需要摆脱对象 O我正在寻找一种机制,允许将对象 O 添加到 C 的映射中并以原子方式从 B映射中删除

B 不会决定接收对象 O 的是 C:它最初所做的只是向参与者 A 发送处置请求。参与者A可以允许或拒绝来自B的处置请求,并从那里向参与者B引入或不引入C以便他们自主和原子地完成交易。

编辑

我最初的问题是标签错误和困惑的。在我的系统中,消息是不可变的:我在参与者之间发送UUID,而不是对对象的引用。这些 UUID 是可变对象的私有每个参与者映射的密钥。B和C在其私有映射中持有的对象在任何给定时间点的使用都是相互排斥的。

对我来说,更进一步并确保 B 和 C

之间没有共享可变对象是微不足道的,也就是说,确保 B 映射中的键 K 和 C 映射中的键 K 指向具有相同 UUID 的不同私有可变对象(例如 Ob 和 Oc)。

一个目标是避免同时对 B 中的对象 Ob 和 C 中的 Oc 进行计算。这本身并不是一个真正的问题(我不介意在从演员B到演员C的过渡过程中偶尔浪费几个CPU周期),但它成为一个问题,因为演员B和C将其模拟结果报告给我们可以称为D的第三方客户端。

D 不知道 A、B 和 C 之间的关系,因此它可能从 B 和 C 接收关于同一 UUID 的结果,无法分辨它应该侦听哪一个。由于模拟的性质,这些结果可能不同且相互矛盾。当然,参与者 B 可以停止模拟对象 Ob,并向参与者 C 发送一条消息,告诉它在对象 Oc 上开始模拟。这将阻止客户端 D 从 B 和 C 接收有关同一对象的消息,但有一个时间范围,在此期间,模拟中可能完全没有此 UUID。也许这对我的应用程序不是很重要,但我仍然需要验证这一点。对我来说,理想的情况是UUID的演员同步切换。

两阶段提交协议可用于进行原子更新。

B -> A tell ("initiate moving of O B->C") 
A -> B tell ("prepare remove O") //
A -> C tell ("prepare add O") // (A selects C)
C changes O to prepare-to-add state
C -> A tell ("ready to add O")
B changes O to prepared-to-remove state.
B -> A tell ("ready to remove O")
A waits for two "ready" messages and then:
A -> B, C tell ("commit")
if A receives timeout, then
A -> B, C tell ("rollback")

为了使此协议可靠地工作,您需要在 B,C 中实现撤消/重做日志。

ScalaSTM可以使协议的实现更容易。我不太确定,但以下内容可能会给出一些如何实现目标的提示:

class O { val owner = Ref(None:Option[ActorRef]) }
class A extends Actor {
  val listOfTransferedObject = mutable.ListBuffer()
  def move(O, to) {
    atomic { implicit txn =>
      val oldOwner = O.owner()
      O.owner() = self 
      listOfTransferedObject += O
      oldOwner.get ! NotifyRemoved(O) // you may prefer to use `ask` 
      O.owner() = to
      O.owner().get ! NotifyAdded(O) // you may prefer to use `ask` 
    }
  }
}

在接收 B 和 C 时,您只需要更新内部映射。

我相信你可以使用Transactor完成你正在尝试做的事情。

具体来说,B和C应该包括Transactor性状。B 应实现 coordinate 方法,以便当 A 向 C 发送一条消息并告诉它继续更新其状态时,B 通过调用 include(C) 邀请 C 参与事务。两个参与者都应该实现atomically方法来处理其内部状态的实际更新(即更新其 Maps)。

像这样:

type Key = // your map key type
type Value = // your map value type
case class AtomicallyTransferObject(transferTo : ActorRef, key : Key, value : Value)
class B extends Actor with Transactor
{
   private val myMap = mutable.Map[Key, Value]
   def coordinate = {
      case AtomicallyTransferObject(transferTo, key, value) => include(transferTo)
   }
   def atomically = {implicit txn =>
      case AtomicallyTransferObject(_, key, value) => myMap -= (key, value)
   }
}
class C extends Actor with Transactor
{
   private val myMap = mutable.Map[Key, Value]
   def atomically = {implicit txn =>
      case AtomicallyTransferObject(self, key, value) => myMap += (key, value)
   }
}

参与者 A 将向 B 发送一条AtomicallyTransferObject消息,这将导致 B 邀请 C 加入事务,然后两个参与者都将以原子方式处理该消息。

有关交易处理器的更多信息,请参见:http://doc.akka.io/docs/akka/current/scala/transactors.html

请注意,对于非事务性消息,您应该实现normally,而不是实现receive,因为Transactor特征提供了委托给normallyatomicallyreceive的实现。

最新更新