处理程序接收的每条消息都包含聚合根的整个状态。然后系统就能够基于这些数据执行所需的操作。在我的场景中,根据消息中的数据授予访问权限,例如访问房间A &一条消息包含了所有被授予的访问权限。这些消息可能会乱序到达,因为像MSMQ这样的消息传递系统不能保证有序传递。
消息#1授予访问房间A &B,但是消息#2只授予访问房间A的权限。如果它们无序到达,则授予访问房间A的权限,然后授予访问房间A的权限;B.这不是期望的结果。只允许进入房间A。每条消息都包含一个时间戳,该时间戳在发布时设置。我想使用这个时间戳来删除无序到达的旧的消息,例如,如果消息#2在消息#1之前到达,那么消息1#应该被丢弃。
我可以在每个处理程序方法中实现此过滤器,但这将是乏味的,所以我希望Rebus有一些EAI消息过滤器?
我对其他选项/实现持开放态度?
我不确定我理解为什么要在消息中传递整个聚合根,但除此之外,还有一种简单的方法,Rebus将使您能够对消息进行排序。
我建议你让你所有的消息实现一个公共接口来捕获这个特定的方面,例如像
public interface IHaveSequenceNumber
{
int SequenceNumber { get; }
}
然后,创建一个简单的消息处理程序来处理IHaveSequenceNumber
,并在遇到旧消息时中止处理程序管道,例如
public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber>
{
public void Handle(IHaveSequenceNumber messageWithSequenceNumber)
{
if (IsTooOld(messageWithSequenceNumber))
{
MessageContext.GetCurrent().Abort(); //< make this the last handler
}
}
}
然后-非常重要的是:)-你确保你的消息过滤器总是在调度开始时处理程序管道中的第一个:
Configure.With(...)
.Transport(...)
.SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>())
.(...)
我将把上面的IsTooOld()
方法的实现留给您—如果您在端点中只有一个工作线程,它可能会很简单,然而并发处理这些事情并不简单。明白了吗?