在将期货与参与者消息混合时,确保测试中的消息顺序



我正在测试一个使用基于未来的API的actor。参与者使用管道模式在将来完成时向自身发送消息:

import akka.pattern.pipe
// ...
// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)

在我的测试中,我模拟了 API,所以我通过承诺来控制未来的完成。但是,这会与直接发送给参与者的其他消息交错:

myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")

现在我想知道如何保证演员收到和 在我的测试中处理消息 A 和 C 之间的消息 B,因为消息 B 实际上是在另一个线程中发送的,所以我无法控制顺序 参与者的邮箱在其中接收邮件。

我想到了几种可能的解决方案:

每条消息
  • 后休眠几毫秒,以生成另一条消息 订单不太可能

  • 等待执行组件确认每条消息,尽管 仅在测试时需要确认

  • 将消息 B 直接发送给 actor,以模拟完成 将来并编写一个单独的测试,以确保管道模式 正确使用(如果参与者会 不将结果消息管道到自身)

我真的不喜欢这两个选项中的任何一个,但我倾向于使用最后一个 一。有没有其他更好的方法可以在测试中强制执行特定的消息顺序?

澄清:问题不在于如何处理消息在生产中可能以随机顺序接收的事实。控制测试中的顺序对于确保参与者实际上可以处理不同的消息顺序至关重要。

一个想法是在参与者中定义一个标志,指示参与者是否已收到消息 B。当参与者收到消息 C 时,如果标志为 false,参与者可以存储该消息 C,然后在参与者收到消息 B 后将其取消存储。例如:

class MyActor extends Actor with Stash {
def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
case MakeApiCall =>
callExternalApi().mapTo[MessageB].pipeTo(self)
case m: MessageB if seenMsgC => // assume msg C has been stashed
unstashAll()
// ...do something with msg B
become(receiveBlock(true, seenMsgC)) // true, true
case m: MessageB if !seenMsgC =>
// ...do something with message B
become(receiveBlock(true, seenMsgC)) // true, false
case m: MessageC if seenMsgB =>
// ...do something with message C
context.become(receiveBlock(seenMsgB, true)) // true, true
case m: MessageC if !seenMsgB =>
stash()
context.become(receiveBlock(seenMsgB, true)) // false, true
case ...
}
def receive = receiveBlock(false, false)
}

在阅读了更多关于 akka 的信息后,我终于找到了一个更好的解决方案:将 actor 邮箱替换为我可以在测试中观察到的邮箱。这样我就可以等到演员在我完成承诺后收到新消息。只有这样,才会发送下一条消息。此TestingMailbox的代码在帖子末尾给出。

更新:在 Akka Typed 中,这可以通过BehaviorInterceptor非常优雅地实现。只需使用自定义拦截器包装被测Behavior,该拦截器转发所有消息和信号,但让您观察它们。 下面给出了非类型化 Akka 的邮箱解决方案。


Actor可以这样配置:

actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))

我必须通过提供配置来确保"测试邮箱"被参与者系统知道:

class MyTest extends TestKit(ActorSystem("some name",
ConfigFactory.parseString("""{ 
testing-mailbox = {
mailbox-type = "my.package.TestingMailbox" 
}
}"""))) 
with BeforeAndAfterAll // ... and so on

设置完成后,我可以像这样更改我的测试:

myActor ! Message("A")
val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
promiseFromApiCall.success(Message("B"))
Await.ready(nextMessage, 3.seconds)
myActor ! Message("C")

用一点辅助方法,我甚至可以这样写:

myActor ! Message("A")
receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
myActor ! Message("C")

这是我的自定义邮箱:

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config 
import scala.concurrent.{Future, Promise}
object TestingMailbox {
val promisesByReceiver =
scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()
class MessageQueue extends UnboundedMailbox.MessageQueue {
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
}
}
def nextMessage(receiver: ActorRef): Future[Any] =
promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future
}
class TestingMailbox extends MailboxType
with ProducesMessageQueue[TestingMailbox.MessageQueue] {
import TestingMailbox._
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]) =
new MessageQueue()
}

如果对消息进行排序非常重要,您应该使用返回Futureask(?)并将它们链接起来,即使您不希望来自参与者的任何响应。

最新更新