如何使用Akka实现类型安全、线程安全的边界



在akka类型中的参与者组之间实现模块边界的公认做法是什么?

TL/DR

下面是一个示例的工作回购
如何实现接收在两个不同协议中定义的消息(预(的单个参与者,类似于在OO中实现两个不同的接口。

示例

关于边界,我指的是经典的OO接口边界:只公开与其他模块相关的操作。

例如:想想爱丽丝、鲍勃和查理。爱丽丝喜欢和鲍勃说话,查理经常想知道鲍勃过得怎么样。查理不知道爱丽丝的事(也不应该知道(,反之亦然。在每对之间都存在一个协议,它们可以接收彼此的消息:

trait Protocol[ From, To ]
object Alice
{
sealed trait BobToAlice extends Protocol[ Bob, Alice ]
case object ApologizeToAlice extends BobToAlice
case object LaughAtAlice extends BobToAlice
}
object Bob
{
sealed trait AliceToBob extends Protocol[ Alice, Bob ]
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[ Charlie, Bob ]
case object HowYouDoinBob extends CharlieToBob
}
object Charlie
{
sealed trait BobToCharlie extends Protocol[ Bob, Charlie ]
case object CryToCharlie extends BobToCharlie
case object LaughToCharlie extends BobToCharlie
}

这里的边界是鲍勃的两张脸:与爱丽丝交谈和与查理交谈是两种不同的协议。现在,每个人都可以与Bob交谈,而无需了解对方的情况。例如,爱丽丝喜欢唱歌,但在唱歌时不会被嘲笑:

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors.same
import akka.actor.typed.{ ActorRef, Behavior }
class Alice( bob: ActorRef[ Protocol[ Alice, Bob ] ] )
{
import Alice._
import nl.papendorp.solipsism.protocol.Bob.{ ScoldBob, SingToBob }
val talkToBob: Behavior[ BobToAlice ] = Behaviors.receiveMessage
{
case LaughAtAlice =>
bob ! ScoldBob
same
case ApologizeToAlice =>
bob ! SingToBob
same
}
}

另一方面,查理只关心鲍勃此刻的感受:


import akka.actor.typed.scaladsl.Behaviors.{ receiveMessage, same }
import akka.actor.typed.{ ActorRef, Behavior }
class Charlie(bob: ActorRef[Protocol[Charlie,Bob]])
{
import Charlie._
import nl.papendorp.solipsism.protocol.Bob.HowYouDoinBob

val concerned: Behavior[BobToCharlie] = receiveMessage
{
case CryToCharlie =>
bob ! HowYouDoinBob
same
case LaughToCharlie =>
bob ! HowYouDoinBob
same
}
}

然而,爱丽丝对鲍勃情绪的影响影响了鲍勃和查理说话的方式。为此,我们需要通过BobsPersonalLife统一这两个协议,以便能够在一个参与者中表示它们:

import akka.actor.typed.scaladsl.Behaviors._
import akka.actor.typed.{ ActorRef, Behavior }
import Alice.BobToAlice
import Charlie.BobToCharlie
object Bob
{
private[ Bob ] sealed trait BobsPersonalLife
sealed trait AliceToBob extends Protocol[Alice, Bob] with BobsPersonalLife
case object SingToBob extends AliceToBob
case object ScoldBob extends AliceToBob
sealed trait CharlieToBob extends Protocol[Charlie, Bob] with BobsPersonalLife
case object HowYouDoinBob extends CharlieToBob
}
class Bob( alice: ActorRef[BobToAlice], charlie: ActorRef[BobToCharlie] )
{
import Alice._
import Bob._
import Charlie._

private val happy: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! LaughToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
sad
case SingToBob =>
alice ! LaughAtAlice
same
}
val sad: Behavior[ BobsPersonalLife ] = receiveMessage
{
case HowYouDoinBob =>
charlie ! CryToCharlie
same
case ScoldBob =>
alice ! ApologizeToAlice
same
case SingToBob  =>
alice ! LaughAtAlice
happy
}
}

到目前为止,一切都很好。我们可以使用ActorRef.narrow[ _X_ToBob ]实例化Alice和Charlie。但是鲍勃呢?或者更确切地说,鲍勃改变了自我?如果我们想用Boris代替Bob,Boris不向Charlie抱怨,而是向Doris抱怨,使用DorisToBob extends Protocol[ Doris, Bob ],我们就无法再接收来自Alice的消息,因为AliceToBobDorisToBob没有共享的超特性。突然间,BobsPersonalLife成为了每个Bob Alice都能与之交谈的锁定。

用鲍里斯取代鲍勃的方法是什么?如果我们使用ActorRef.unsafeUpcast,我们将失去类型安全性。如果我们在一个共享状态上使用两个参与者,我们就会失去线程安全性。包装_X_ToBob(例如Either[ AliceToBob, CharlieToBob ]或Dotty的简写联合类型(也不起作用,因为包装器只是接管了BobsPersonalLife的角色。当我们让DorisToBobBobsPersonalLife继承时,我们最终会得到所有波波头的所有可能伙伴的联盟——自我永远无法移除他们中的任何一个。

问题

我们如何在Bob内部实现Alice和Charlie之间的真正类型安全解耦?

我认为这几乎是一个X:Y问题("我如何在Akka中实现接口边界"与"我如何实现Akka中的接口边界目标"(。

object Protocol {
sealed trait Message
sealed trait LaughReply extends Message
sealed trait MoodReply extends Message
case class Apology(from: ActorRef[Singing]) extends Message
case class Singing(from: ActorRef[Laughing]) extends Message
case class Laughing(from: ActorRef[LaughReply]) extends Message with MoodReply
case class HowYouDoin(replyTo: ActorRef[MoodReply]) extends Message with LaughReply
case class Scolding(from: ActorRef[Apology]) extends Message with LaughReply
case class Crying(from: ActorRef[HowYouDoin]) extends Message with MoodReply
}
object Alice {
val talkToBob: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Apology(from) =>
from ! Singing(context.self)
Behaviors.same
case Laughing(from) =>
from ! Scolding(context.self)
Behaviors.same
case _ =>  // Every other message is ignored by Alice
Behaviors.same
}
}
}
object Charlie {
val concerned: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case Crying(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case Laughing(from) =>
from ! HowYouDoin(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}
object Bob {
val happy: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Laughing(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
sad
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}

val sad: Behavior[Message] = Behaviors.receive { (context, msg) =>
msg match {
case HowYouDoin(replyTo) =>
replyTo ! Crying(context.self)
Behaviors.same
case Scolding(from) =>
from ! Apology(context.self)
Behaviors.same
case Singing(from) =>
from ! Laughing(context.self)
Behaviors.same
case _ =>
Behaviors.same
}
}
}

技巧基本上是通过mixin进行协议分解,并在消息中对协议状态(接受哪些消息(进行编码。只要没有人持有对ActorRef[Message]的引用(ActorRef是相反的,所以ActorRef[LaughReply]不是ActorRef[Message](,就没有办法发送目标没有承诺接受的消息。请注意,将一个ActorRef保持在一个参与者的状态是积极的:如果你要将另一个ActorRef保持在你的参与者的状态,这是一个非常强烈的迹象,表明你根本没有兴趣将它们解耦。

一种替代方案,而不是总体协议,是为Alice/Bob/Charlie等中的每一个都制定协议。命令和应答仅在该参与者的上下文中定义,并且使用例如键入的询问模式来将目标参与者的应答协议调整为请求参与者的命令协议。

最新更新