使用映射函数中的执行组件时超时和崩溃



我想做这样的事情:

对于集合中的每个项目,在等待响应时询问一个 actor 和块,如果抛出超时异常,我想继续下一个项目。

下面是代码模式:

implicit val timeout: akka.util.Timeout = 3 seconds
collection.foreach { item =>
val future = (actor ? Request(msg = item)).mapTo[Response]
future.onComplete {
case Failure(ex) => // log ex
case Success(resp) => // use resp
}
Await.result(future, 3 seconds)
}

演员本身正在呼叫其他演员,这可能需要比我 3 秒的时间更长的时间。

这不会按预期工作:在第一项超时后,整个事情崩溃并停止。有一些死信通知,我想这是因为当我的演员调用的演员完成时,原始发件人无效(花了 3 秒多(。所以我的问题是我如何告诉它忘记超时项目并继续其余项目,就好像什么都没发生一样?

>@stefanobaghino是对的。请参阅此处,如文档中所写,如果将来包含异常,则Await.result抛出是为了可以正确处理它。

在这里,您正在匹配未来的故障情况,但您没有从中恢复。更好的方法是遵循 -

collection.foreach { item =>
val future = (actor ? Request(msg = item)).mapTo[Response]
future.recover {
case ex: Exception =>
// log ex
Response(ex.message) // some other object of type Response
}
val response = Await.result(future, 3 seconds) 
// use response here
}

在阅读了@Dimitri的答案后,我尝试以毫秒为单位记录时间戳,以查看它导致整个过程延迟的位置,我发现了相当奇怪的行为。我观察到,每当有死信时,甚至开始处理给演员的下一条消息都会有很大的滞后。不知道为什么会这样。下面是我试图检查它的代码 -

package com.lightbend.akka.sample
import akka.actor.{ Actor, ActorLogging, ActorRef, ActorSystem, Props }
import akka.pattern.{ ask, pipe, AskTimeoutException }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.io.StdIn
import scala.util.{ Try, Success, Failure }
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.TimeoutException
object AkkaQuickStart {
class NumberActor extends Actor {
override def receive: Receive = {
case (num: Int, startAt: Long) =>
println("B " +  num.toString + " : " + System.currentTimeMillis().toString + " : " + (System.currentTimeMillis() - startAt).toString)
Thread.sleep(500 * num)
sender() ! "OK"
}
}
def main(args: Array[String]): Unit = {
implicit val timeout: akka.util.Timeout = 1 seconds
val numActor = ActorSystem("system").actorOf(Props(new NumberActor()))
val range = (1 to 5) ++ (4 to 1 by -1)
println(range)
def lag(implicit startAt: Long): String = (System.currentTimeMillis() - startAt).toString
range.map { r =>
implicit val startAt = System.currentTimeMillis()
println("A " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag)
val future = (numActor ? (r, startAt))
.recover {
case ex: AskTimeoutException =>
println("E " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag)
"Ask timeout"
}
.mapTo[String]
future.onComplete{
case Success(reply) => 
println("C " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag + " : success " + reply)
case Failure(reply) => 
println("C " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag + " : failure")
}
Try(Await.result(future, 1 seconds)) match {
case Success(reply) => 
println("D " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag + " : " + reply)
case Failure(ex) => 
println("D " +  r.toString + " : " + System.currentTimeMillis().toString + " : " + lag + " : Await timeout ")
}
}
}
}

我尝试了 Ask 超时和等待超时的不同组合,发现在迭代结束时开始处理发送的参与者消息时存在以下滞后 -

询问超时 = 1 等待超时 = 1 => 3000 - 4500 ms 导致死信

询问超时 = 1 等待超时 = 3 => 3000 - 4500 ms 导致死信

询问超时 = 3 等待超时 = 1 => 3000 - 4500 ms 导致死信

询问超时 = 3 等待超时 = 3 => 0 - 500 毫秒不会导致死信

我不确定,但猜测调度程序需要时间来处理死信,因此无法开始处理我们 Actor 的消息。可能一些更有经验的人可以解释一下。

@stefanobaghino @Tarun 感谢您的帮助,我想我现在明白了。

所以问题是有 2 个超时可能会导致异常:

  • 如果我们必须等待的时间超过参与者响应的时间长,则询问 (?( 超时会抛出akka.pattern.AskTimeoutException

  • 如果我们不等待足够长的时间等待未来完成,Await.result就会java.util.concurrent.TimeoutException

这两者都可能导致整个事情崩溃。对于第一个,正如您提到的,我们可以添加恢复以返回一些默认值。对于第二个,我们还应该捕获并处理异常。

在以下示例中,在更改两个超时和删除recover/Try时,您可以看到不同的行为:

object Example {
class NumberActor extends Actor {
override def receive: Receive = {
case num: Int =>
Thread.sleep(250 * num)
sender() ! "OK"
}
}
def main(): Unit = {
implicit val timeout: akka.util.Timeout = 1 seconds
val numActor = ActorSystem("system").actorOf(Props(new NumberActor()))
val range = (1 to 5) ++ (4 to 1 by -1)
println(range)
range.map { r =>
val future = (numActor ? r)
.recover { case ex: TimeoutException => "FAIL" }
.mapTo[String]
Try(Await.result(future, 1 seconds)) match {
case Success(reply) => println(reply)
case Failure(ex) => println(ex)
}
}
}
}

最新更新