我试着继续指望每一个成功的导入。但这里有一个问题——Counter工作,如果路由器从它的父节点接收到消息,但如果我试图从它的子节点发送消息,它会接收到消息,但不会更新超出作用域的全局变量。我知道这听起来很复杂。让我给你看代码。这是路由器
class Watcher(size: Int) extends Actor {
var router = {
val routees = Vector.fill(size) {
val w = context.actorOf(
Props[Worker]
)
context.watch(w)
ActorRefRoutee(w)
}
Router(RoundRobinRoutingLogic(), routees)
}
var sent = 0
override def supervisorStrategy(): SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 100) {
case _: DocumentNotFoundException => {
Resume
}
case _: Exception => Escalate
}
override def receive: Receive = {
case container: MessageContainer =>
router.route(container, sender)
case Success =>
sent += 1
case GetValue =>
sender ! sent
case Terminated(a) =>
router.removeRoutee(a)
val w = context.actorOf(Props[Worker])
context.watch(w)
router = router.addRoutee(w)
case undef =>
println(s"${this.getClass} received undefinable message: $undef")
}
}
这是工人
class Worker() extends Actor with ActorLogging {
var messages = Seq[MessageContainer]()
var received = 0
override def receive: Receive = {
case container: MessageContainer =>
try {
importMessage(container.message, container.repo)
context.parent ! Success
} catch {
case e: Exception =>
throw e
}
case e: Error =>
log.info(s"Error occurred $e")
sender ! e
case undef => println(s"${this.getClass} received undefinable message: $undef")
}
}
在supervisor ? GetValue
上,我得到0,但假设有1000。最奇怪的是,当我在case Success => ...
上使用断点进行调试时,每次新消息到达时,值都会增加。但supervisor ? GetValue
仍然返回0。
让我们假设我想依靠case container: MessageContainer => ...
,它将神奇地工作;我会得到理想的数字,但它不会显示我是否导入了任何东西。这是怎么呢
下面是测试用例。
@Test
def testRouter(): Unit = {
val system = ActorSystem("RouterTestSystem")
// val serv = AddressFromURIString("akka.tcp://master@host:1334")
val supervisor = system.actorOf(Props(new Watcher(20)))//.withDeploy(akka.actor.Deploy(scope = RemoteScope(serv))))
val repo = coreSession.getRepositoryName
val containers = (0 until num)
.map(_ => MessageContainer(MessageFactory.generate("/"), repo))
val watch = Stopwatch.createStarted()
(0 until num).par
.foreach( i => {
supervisor ! containers.apply(i)
})
implicit val timeout = Timeout(60 seconds)
val future = supervisor ? GetValue
val result = Await.result(future, timeout.duration).asInstanceOf[Int]
val speed = result / (watch.elapsed(TimeUnit.MILLISECONDS) / 1000.0)
println(f"Import speed: $speed%.2f")
assertEquals(num, result)
}
你能详细解释一下吗?为什么会发生这种情况?为什么只从孩子那里收到信息?另一种方法吗?
嗯…没有共享的代码部分可能隐藏着许多潜在的问题。但是,为了这次讨论,我将假设其他一切都很好,我们将只讨论您的共享代码的问题。
现在,让我解释一下actor。简单地说,每个参与者都有一个邮箱(它按照接收到的顺序保存消息),并按照接收到的顺序逐个处理这些消息。由于邮箱的使用方式与Queue
类似,因此在本文中我们将其称为Queue。
也……我不知道这个container.apply(i)
会返回什么…所以我将把container.apply(1)
的返回值称为MessageContainer__1
在测试运行器中,您首先创建Watcher
的实例,
val supervisor = system.actorOf(Props(new Watcher(20)))
现在,假设你要把这些2 messages (num = 2)
发送到supervisor
,
所以主管的邮箱看起来像这样,
Queue(MessageContainer__0, MessageContainer__1)
然后发送另一条消息GetValue
,这样邮箱将看起来像,
Queue(MessageContainer__0, MessageContainer__1, GetValue)
现在actor将处理第一条消息并将其传递给worker,邮箱将看起来像,
Queue(MessageContainer__1, GetValue)
现在,即使您的worker在发送回复时非常快速和即时,邮箱也会看起来像,
Queue(MessageContainer__1, GetValue, Success)
现在,由于您的worker超快速且立即回复Success
,传递第二个MessageContainer后的状态将看起来像,
Queue(GetValue, Success, Success)
…这就是你问题的根源。Supervisor
在任何Success
消息之前看到GetValue
消息,无论您的工人有多快。
因此,它将处理GetValue
,并返回sent
的当前值0。