我们可以通过公共交通将RabbitMQ和Mediatr一起使用吗?



我创建了一个微服务应用程序,使用MassTransit和RabbitMQ进行微服务进行通信。
每个微服务都使用干净的架构开发,因此我们在每个微服务中都有MediatR
是否可以也使用公共交通进行内部通信? 所以我可以对所有服务使用相同的签名,当我想公开要在微服务间使用的服务时,它可以轻松实现。 所以MediatR用于内部通信,RabbitMQ用于内部通信,整个宇宙都在MassTransit系统上。
[更新]我的问题是如何配置消费者,以便有些可以用于内部通信(通过 MediatR(,有些可以用于外部通信(通过 RabbitMQ(,并轻松地将它们从内部更改为外部。
[更新2]例如,这是我的公共交通注册:

services.AddMassTransit(x =>
{
x.AddConsumers(Assembly.GetExecutingAssembly());
x.AddBus(provider =>
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(config.RabbitMQ.Address), h =>
{
h.Username(config.RabbitMQ.Username);
h.Password(config.RabbitMQ.Password);
});
cfg.ReceiveEndpoint("my-queue", ep => { ep.ConfigureConsumers(provider); });
}));

x.AddMediator((provider, cfg) => { cfg.ConfigureConsumers(provider); });
});

我在内部沟通和外部沟通中有何不同?换句话说,我如何向 MediatR 注册一些消费者,将一些消费者注册到 RabbitMQ?

它们可以一起使用,并且 MassTransit 也有自己的 Mediator 实现,因此您可以编写一次处理程序,然后通过中介器或通过持久传输(如 RabbitMQ(使用它们。

有一些视频可以带您了解这些功能,从调解器开始,然后转到 RabbitMQ。

我发现我应该为每个创建单独的总线。 然后外部服务继承自像IExternalConsumer这样的接口,所以我可以将它们从内部服务中分离出来,并将它们添加到相关的总线中:针对版本 7 进行了更新

// find consumers
var types = AssemblyTypeCache.FindTypes(new[]{Assembly.GetExecutingAssembly()},TypeMetadataCache.IsConsumerOrDefinition).GetAwaiter().GetResult();
var consumers = types.FindTypes(TypeClassification.Concrete | TypeClassification.Closed).ToArray();
var internals = new List<Type>();
var externals = new List<Type>();
foreach (Type type in consumers)
{
if (type.HasInterface<IExternalConsumer>())
externals.Add(type);
else
internals.Add(type);
}
services.AddMediator(x =>
{
x.AddConsumers(internals.ToArray());
x.ConfigureMediator((provider, cfg) => cfg.UseFluentValidation());
});
services.AddMassTransit<IExternalBus>(x =>
{
x.AddConsumers(externals.ToArray());
x.AddBus(provider =>
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(config.RabbitMQ.Address), h =>
{
h.Username(config.RabbitMQ.Username);
h.Password(config.RabbitMQ.Password);
});
cfg.ReceiveEndpoint(apiProviderName, ep => { ep.ConfigureConsumers(provider); });
}));
});
services.AddMassTransitHostedService();

最新更新