Akka路由器从路由到达的消息增加计数器



我试着继续指望每一个成功的导入。但这里有一个问题——Counter工作,如果路由器从它的父节点接收到消息,但如果我试图从它的子节点发送消息,它会接收到消息,但不会更新超出作用域的全局变量。我知道这听起来很复杂。让我给你看代码。这是路由器

    class Watcher(size: Int) extends Actor { 
      var router = {
        val routees = Vector.fill(size) {
          val w = context.actorOf(
            Props[Worker]
          )
          context.watch(w)
          ActorRefRoutee(w)
        }
        Router(RoundRobinRoutingLogic(), routees)
      }
      var sent = 0
      override def supervisorStrategy(): SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 100) {
        case _: DocumentNotFoundException => {
          Resume
        }
        case _: Exception => Escalate
      }
      override def receive: Receive = {
        case container: MessageContainer =>
          router.route(container, sender)
        case Success =>
          sent += 1
        case GetValue =>
          sender ! sent
        case Terminated(a) =>
          router.removeRoutee(a)
          val w = context.actorOf(Props[Worker])
          context.watch(w)
          router = router.addRoutee(w)
        case undef =>
          println(s"${this.getClass} received undefinable message: $undef")
      }
}

这是工人

class Worker() extends Actor with ActorLogging {
  var messages = Seq[MessageContainer]()
  var received = 0
  override def receive: Receive = {
    case container: MessageContainer =>
      try {
        importMessage(container.message, container.repo)
        context.parent ! Success
      } catch {
        case e: Exception =>
          throw e
      }
    case e: Error =>
      log.info(s"Error occurred $e")
      sender ! e
    case undef => println(s"${this.getClass} received undefinable message: $undef")
  }
}

supervisor ? GetValue上,我得到0,但假设有1000。最奇怪的是,当我在case Success => ...上使用断点进行调试时,每次新消息到达时,值都会增加。但supervisor ? GetValue仍然返回0。

让我们假设我想依靠case container: MessageContainer => ...,它将神奇地工作;我会得到理想的数字,但它不会显示我是否导入了任何东西。这是怎么呢

下面是测试用例。

 @Test
  def testRouter(): Unit = {
    val system = ActorSystem("RouterTestSystem")
//    val serv = AddressFromURIString("akka.tcp://master@host:1334")
    val supervisor = system.actorOf(Props(new Watcher(20)))//.withDeploy(akka.actor.Deploy(scope = RemoteScope(serv))))
    val repo = coreSession.getRepositoryName
    val containers = (0 until num)
      .map(_ => MessageContainer(MessageFactory.generate("/"), repo))
    val watch = Stopwatch.createStarted()
    (0 until num).par
      .foreach( i => {
      supervisor ! containers.apply(i)
    })
    implicit val timeout = Timeout(60 seconds)
    val future = supervisor ? GetValue
    val result = Await.result(future, timeout.duration).asInstanceOf[Int]
    val speed = result / (watch.elapsed(TimeUnit.MILLISECONDS) / 1000.0)
    println(f"Import speed: $speed%.2f")
    assertEquals(num, result)
  }

你能详细解释一下吗?为什么会发生这种情况?为什么只从孩子那里收到信息?另一种方法吗?

嗯…没有共享的代码部分可能隐藏着许多潜在的问题。但是,为了这次讨论,我将假设其他一切都很好,我们将只讨论您的共享代码的问题。

现在,让我解释一下actor。简单地说,每个参与者都有一个邮箱(它按照接收到的顺序保存消息),并按照接收到的顺序逐个处理这些消息。由于邮箱的使用方式与Queue类似,因此在本文中我们将其称为Queue。

也……我不知道这个container.apply(i)会返回什么…所以我将把container.apply(1)的返回值称为MessageContainer__1

在测试运行器中,您首先创建Watcher的实例,

val supervisor = system.actorOf(Props(new Watcher(20)))

现在,假设你要把这些2 messages (num = 2)发送到supervisor

所以主管的邮箱看起来像这样,

Queue(MessageContainer__0, MessageContainer__1)

然后发送另一条消息GetValue,这样邮箱将看起来像,

Queue(MessageContainer__0, MessageContainer__1, GetValue)
现在actor将处理第一条消息并将其传递给worker,邮箱将看起来像,
Queue(MessageContainer__1, GetValue)

现在,即使您的worker在发送回复时非常快速和即时,邮箱也会看起来像,

Queue(MessageContainer__1, GetValue, Success)

现在,由于您的worker超快速且立即回复Success,传递第二个MessageContainer后的状态将看起来像,

Queue(GetValue, Success, Success)

…这就是你问题的根源。Supervisor在任何Success消息之前看到GetValue消息,无论您的工人有多快。

因此,它将处理GetValue,并返回sent的当前值0。

相关内容

最新更新