MassTransit筛选用户可以处理的消息



我有一个像这样的通用消息接口:

public interface IMyMessage
{
int EventCode {get;}
}

现在我有多个消费者在处理这个消息:

public class MyConsumer1: IConsumer<IMyMessage>{...}
public class MyConsumer2: IConsumer<IMyMessage>{...}

我希望MyConsumer1只处理EventCode=1的消息,并让MyConsumer2处理EventCode=2的所有消息。

我知道我可以在Consume方法中执行if语句,但想知道是否有更好的方法,比如路由过滤器?

我的首选方法是创建一个属性,即HandlesEventCodeAttribute(1(并将其应用于Consumers。

我还将Autofac集装箱与MassTransit集成使用。

请帮忙。

感谢

在我对实际问题进行任何输入之前,我会问您为什么要使用具有属性的相同消息类型来确定哪些消费者实际使用该消息。有更好(更高效(的方法可用,例如使用与RabbitMQ的DIRECT交换。

您可以创建自己的属性,并创建一个中间件过滤器,该过滤器将查看消费者,查看它是否具有自定义属性,然后使用该属性中的值来检查消息,并在消费者对消息不感兴趣时对其进行过滤。

完整的工作样本如下所示:

首先,创建属性。

class EventCodeAttribute :
Attribute
{
public int EventCode { get; }
public EventCodeAttribute(int eventCode)
{
EventCode = eventCode;
}
}

消息类型:

interface IEventMessage
{
int EventCode { get; }
}

中间件过滤器:

class EventCodeFilter<TConsumer> :
IFilter<ConsumerConsumeContext<TConsumer, IEventMessage>>
where TConsumer : class
{
readonly int _eventCode;
public EventCodeFilter()
{
var attribute = typeof(TConsumer).GetCustomAttribute<EventCodeAttribute>();
if (attribute == null)
throw new ArgumentException("Message does not have the attribute required");
_eventCode = attribute.EventCode;
}
public async Task Send(ConsumerConsumeContext<TConsumer, IEventMessage> context, IPipe<ConsumerConsumeContext<TConsumer, IEventMessage>> next)
{
if (context.Message.EventCode.Equals(_eventCode))
{
await next.Send(context);
}
}
public void Probe(ProbeContext context)
{
var scope = context.CreateFilterScope("eventCode");
scope.Add("code", _eventCode);
}
}

示例消费者:

[EventCode(27)]
class EventCodeConsumer :
IConsumer<IEventMessage>
{
public async Task Consume(ConsumeContext<IEventMessage> context)
{
}
}

最后,配置消费者使用过滤器:

builder.AddMassTransit(cfg =>
{
cfg.AddConsumer<EventCodeConsumer>(x =>
x.ConsumerMessage<IEventMessage>(p => p.UseFilter(new EventCodeFilter<EventCodeConsumer>())));
});

最新更新