了解崩溃时AKKA Actor默认行为



演员的默认行为是重新启动。为了获得更好的理解,我创建了两个演员和一次崩溃,以引起重新启动:

class PingActor extends Actor with ActorLogging {
  import PingActor._
  override def postRestart(reason: Throwable): Unit = {
    log.info(s"RESTARTING")
    super.postRestart(reason)
  }
  var counter = 0
  val pongActor = context.actorOf(PongActor.props, "pongActor")
  def receive = {
    case Initialize => 
      pongActor ! PingMessage("ping")   
    case PongActor.PongMessage(text) =>
      log.info("In PingActor - received message: {}", text)
      counter += 1
      if (counter == 3) {
         log.info("FIN")
        context.system.shutdown()
      }
      if (counter == 2) {
        sender() ! PingMessage("ping")
        throw new IllegalArgumentException("Aooch")
      }
      else sender() ! PingMessage("ping")
  } 
}
object PingActor {
  val props = Props[PingActor]
  case object Initialize
  case class PingMessage(text: String)
}
class PongActor extends Actor with ActorLogging {
  import PongActor._
  def receive = {
    case PingActor.PingMessage(text) => 
      log.info("In PongActor - received message: {}", text)
      sender() ! PongMessage("pong")
  }
}
object PongActor {
  val props = Props[PongActor]
  case class PongMessage(text: String)
}

因此,PingActor在崩溃之前发送消息,PongActor回复了Ping Actor重新启动时接受消息,但发件人(PongActor)不可用。我确实在无量工中看到了这条线

[info] [myactorsystem-akka.actor.default-dispatcher-4] [akka://myactorsystem/user/pingactor/pongactor]消息 [com.example.pancampActor $ pingmessage]来自 演员[akka://myactorsystem/user/pingactor#-1362690296] 演员[akka://myactorsystem/user/pingactor/pongactor#1725419686] 没送到。[1]遇到的死信。这个记录可以是 通过配置设置关闭或调整 'akka.log dead-letters'和'akka.log dead-ledters-dead-dearding shutdown'。

为什么在崩溃之前发送消息的发件人不可用?有没有办法克服它?

作为文档所述,默认行为重新启动时的默认行为是停止所有演员的孩子。这就是重新启动后与sender的消息发送给死信后的原因。您需要覆盖默认行为,并在preStart()钩中设置子女的初始化(即PongActor)。另外,删除呼叫super.postRestart(reason)

class PingActor extends Actor with ActorLogging {
  import PingActor._
  override def preStart(): Unit = {
    pongActor = Option(context.actorOf(PongActor.props, "pongActor"))
  }
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info("Don't stop the children")
    postStop()
  }
  override def postRestart(reason: Throwable): Unit = {
    log.info("RESTARTING")
  }
  var counter = 0
  var pongActor: Option[ActorRef] = None
  def receive = {
    case Initialize => 
      pongActor.foreach(_ ! PingMessage("ping"))
    case PongActor.PongMessage(text) =>
      ...
  } 
}

进行上述更改会导致以下输出:

In PongActor - received message: ping
In PingActor - received message: pong
In PongActor - received message: ping
In PingActor - received message: pong
In PongActor - received message: ping
Don't stop the children
RESTARTING
In PingActor - received message: pong
In PongActor - received message: ping
In PingActor - received message: pong
In PongActor - received message: ping
Don't stop the children
RESTARTING
In PingActor - received message: pong
In PongActor - received message: ping
In PingActor - received message: pong
In PongActor - received message: ping
Don't stop the children
RESTARTING
In PingActor - received message: pong
In PongActor - received message: ping
...

此处还可以找到有关重新启动的更多信息。

最新更新