如何使用Rebus过滤消息?



处理程序接收的每条消息都包含聚合根的整个状态。然后系统就能够基于这些数据执行所需的操作。在我的场景中,根据消息中的数据授予访问权限,例如访问房间A &一条消息包含了所有被授予的访问权限。这些消息可能会乱序到达,因为像MSMQ这样的消息传递系统不能保证有序传递。

消息#1授予访问房间A &B,但是消息#2只授予访问房间A的权限。如果它们无序到达,则授予访问房间A的权限,然后授予访问房间A的权限;B.这不是期望的结果。只允许进入房间A。每条消息都包含一个时间戳,该时间戳在发布时设置。我想使用这个时间戳来删除无序到达的旧的消息,例如,如果消息#2在消息#1之前到达,那么消息1#应该被丢弃。

我可以在每个处理程序方法中实现此过滤器,但这将是乏味的,所以我希望Rebus有一些EAI消息过滤器?

我对其他选项/实现持开放态度?

我不确定我理解为什么要在消息中传递整个聚合根,但除此之外,还有一种简单的方法,Rebus将使您能够对消息进行排序。

我建议你让你所有的消息实现一个公共接口来捕获这个特定的方面,例如像

public interface IHaveSequenceNumber
{
    int SequenceNumber { get; }
}

然后,创建一个简单的消息处理程序来处理IHaveSequenceNumber,并在遇到旧消息时中止处理程序管道,例如

public class WillDiscardOldMessages : IHandleMessages<IHaveSequenceNumber>
{
    public void Handle(IHaveSequenceNumber messageWithSequenceNumber)
    {
        if (IsTooOld(messageWithSequenceNumber))
        {
            MessageContext.GetCurrent().Abort(); //< make this the last handler
        }
    }
}

然后-非常重要的是:)-你确保你的消息过滤器总是在调度开始时处理程序管道中的第一个:

Configure.With(...)
    .Transport(...)
    .SpecifyOrderOfHandlers(s => s.First<WillDiscardOldMessages>())
    .(...)
我将把上面的IsTooOld()方法的实现留给您—如果您在端点中只有一个工作线程,它可能会很简单,然而并发处理这些事情并不简单。

明白了吗?

相关内容

  • 没有找到相关文章

最新更新