如果我有消息类型列表,如何在 MassTransit 中注册通用使用者适配器



我成功地将 MassTransit 用于一个愚蠢的示例应用程序,其中我从发布者控制台应用程序发布一条消息(事件),并在两个不同的消费者那里收到它,这两个消费者也是使用 RabbitMQ 的控制台应用程序。

这是整个示例项目 git 存储库:https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

我想有一个包装 MassTransit 功能的项目,以便我的发布者和消费者项目对 MassTransit 一无所知。依赖项应朝这个方向发展:

  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contract
  • DiDrDe.Model ==> DiDrDe.Contract
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contract
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts(英语:DiDrDe.Contracts
  • )
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contract
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

请注意,DiDrDe.MessageBus 对 DiDrDe.Model 一无所知,因为它是一个通用项目,应该对任何消息类型都有效。

为了实现这一点,我正在实现适配器模式,以便我的自定义接口IEventDtoBus(发布事件)和IEventDtoHandler<TEventDto>(使用事件)都是我的发布者和使用者都知道的。MassTransit 包装器项目(称为 DiDrDe.MessageBus)使用由IEventDtoBusEventDtoHandlerAdapter<TEventDto>组成的EventDtoBusAdapter实现适配器,作为我唯一的泛型IConsumer<TEventDto>IEventDtoHandler<TEventDto>组成

我遇到的问题是 MassTransit 要求注册消费者的方式,因为我的消费者是通用消费者,在编译时 MassTransit 包装器不应该知道其类型。

我需要找到一种方法,将EventDtoHandlerAdapter<TEventDto>注册为我在运行时传递的每个 TEventDto 类型的使用者(例如作为类型集合)。有关所有详细信息,请参阅我的存储库。

MassTransit 支持接受类型(好!正是我想要的)的重载方法,但它也需要第二个参数Func<type, object> consumerFactory,我不知道如何实现它。

更新 1问题是我无法注册这个通用消费者,例如:

consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();

因为我收到编译错误

严重性代码说明项目文件行抑制状态 错误 CS0310"事件Dto处理程序适配器"必须是 具有公共无参数构造函数的非抽象类型,以便 将其用作泛型类型或方法中的参数"TConsumer" 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

更新2:我已经尝试了几件事,并且我已经更新了存储库中的项目。这些是我在MassTransit wrapper项目中的尝试。请注意,如果我向要处理的每条消息(事件)添加依赖项,我如何能够使一切正常工作。但我不想这样..我不希望这个项目知道它可以处理的消息。如果我能注册只知道消息类型的消费者就好了。

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
// DOES NOT WORK
//var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
//var eventDtoHandler = context.Resolve(typeEventDtoHandler);
//consumer.Consumer(eventDtoHandler);
// DOES NOT WORK
//consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
// DOES NOT WORK
//var consumerGenericType = typeof(IConsumer<>);
//var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
//consumer.Consumer(consumerThingHappenedType,  null);
});

更新3:按照伊戈尔的建议,我尝试执行以下操作:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
consumer.Consumer(adapterType, (Type x) => context.Resolve(x));

但是我收到运行时错误,说

请求的服务 'DiDrDe.MessageBus.EventDtoHandlerAdapter'1[[DiDrDe.Model.ThingHappen, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' 尚未注册。若要避免此异常,请注册 提供服务的组件,请使用 检查服务注册 IsRegister(),或使用 ResolveOptional() 方法解析 可选依赖项。

我什至尝试将EventDtoHandlerAdapter<>单独注册为 IConsumer,以防这是问题所在但没有运气。

builder
.RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
.As(typeof(IConsumer<>))
.SingleInstance();

还带有:

builder
.RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
.AsSelf();

它告诉我

System.ObjectDisposedException:"此解析操作已 结束。使用 lambda 注册组件时, 无法存储 lambda 的 IComponentContext 'c' 参数。 相反,要么从"c"再次解析 IComponentContext,要么解析 基于<>工厂创建后续组件

澄清一下,我唯一需要注册的消费者是我的EventDtoHandlerAdapter<TEventDto>。它是通用的,因此基本上它将存在我支持的每个TEventDto的注册。问题是我事先不需要类型,所以我需要使用类型进行操作。

更新4:伊戈尔建议的新尝试。这次是用"代理"。我已经用最后一次尝试更新了我的回购以获取所有详细信息。我有我的使用者和标记接口:

public interface IEventDtoHandler
{
}
public interface IEventDtoHandler<in TEventDto>
: IEventDtoHandler
where TEventDto : IEventDto
{
Task HandleAsync(TEventDto eventDto);
}

我有一个对公共交通一无所知的消费者的实现:

public class ThingHappenedHandler
: IEventDtoHandler<ThingHappened>
{
public Task HandleAsync(ThingHappened eventDto)
{
Console.WriteLine($"Received {eventDto.Name} " +
$"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
return Task.CompletedTask;
}
}

现在我的"包装消费者"就是我所说的适配器,因为它知道MassTransit (它实现了MassTransitIConsumer)。

public class EventDtoHandlerAdapter<TConsumer, TEventDto>
: IConsumer<TEventDto>
where TConsumer : IEventDtoHandler<TEventDto>
where TEventDto : class, IEventDto
{
private readonly TConsumer _consumer;
public EventDtoHandlerAdapter(TConsumer consumer)
{
_consumer = consumer;
}
public async Task Consume(ConsumeContext<TEventDto> context)
{
await _consumer.HandleAsync(context.Message);
}
}

现在最后一步是在MassTransit注册我的"包装消费者"。但由于它是通用的,我不知道该怎么做。这就是问题所在。

我可以按照以下建议在Autofac中扫描和注册我的所有使用者类型:

var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x)
&& !x.IsInterface
&& !x.IsAbstract)
.ToList();

所以现在我拥有所有消费者类型(IEventDtoHandler的所有实现,包括我的ThingHappenedHandler)。现在怎么办?如何注册?

类似以下内容的内容不起作用

foreach (var consumerType in consumerTypes)
{
consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}

但我想它不起作用是正常的,因为我想注册的是我的EventDtoHandlerAdapter,这是真正的IConsumer

所以,我想我不明白你的建议。不好意思!

我需要的是这样的东西:

//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));

但是不使用 ThingHappen 模型,因为该模型不应该是已知的。这就是我仍然被困住的地方

更新5:克里斯·帕特森(Chris Patterson)建议的新尝试(他的解决方案已合并到我的存储库的master中),但问题仍然存在。

澄清一下,DiDrDe.MessageBus必须与任何发布者、消费者和模型无关。它应该只依赖于公共交通和DiDrDe.Contracts,Chris的解决方案有这样一行:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});

这与ThingHappened模型有直接关系。这是不允许的,它实际上与我已经拥有的解决方案没有太大区别,即:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
//THIS works, but it uses ThingHappened explicitly and I don't want that dependency
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});

抱歉,如果这还不够清楚,但DiDrDe.MessageBus最终将成为许多不同的使用者和发布者项目之间共享的nuGet包,它不应该依赖于任何特定的消息/模型。

更新6:问题已解决。非常感谢伊戈尔和克里斯的时间和帮助。 我已经将解决方案推送到我的存储库中掌握。

PS:不幸的是,当我在同一个消费者中有两个处理程序处理同一个事件时,这个解决方案有其局限性,因为似乎只有一个处理程序被执行(两次)。我希望两个处理程序都执行或只执行一个,但只执行一次(不是两次)。但这是另一个已经:)的主题

在此处查看自定义消费者约定:https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventionalhttps://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.Tests/Conventional/ConventionConsumer_Specs.cs

创建你自己的IMyConsumerInterface,IMyMessageInterface将其插入该测试的代码中。 在创建总线之前ConsumerConvention.Register<CustomConsumerConvention>();注册它。它应该有效。

此外,您可以围绕使用者上下文创建自己的包装器,并将其与消息一起传递。

LoadFrom(MassTransit.AutofacIntegration)在自定义约定中对我不起作用,所以我不得不手动注册消费者

foreach (var consumer in consumerTypes)
cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

或者,如果要使用"代理"方法,请执行以下操作:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
where TMessage : class, IMyMessageInterface 
where TConsumer : IMyConsumerInterface<TMessage>
{
private readonly TConsumer _consumer;
public WrapperConsumer(TConsumer consumer)
{
_consumer = consumer;
}
public Task Consume(ConsumeContext<TMessage> context)
{
return _consumer.Consume(context.Message);
}
}

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));

适用于这两种方法的附加代码
// marker interface
public interface IMyConsumerInterface
{
}
public interface IMyConsumerInterface<T> : IMyConsumerInterface
where T : IMyMessageInterface 
{
Task Consume(T message);
}
...
builder.RegisterAssemblyTypes(ThisAssembly)
.AssignableTo<IMyConsumerInterface>()
.AsSelf()
.As<IMyConsumerInterface>();
...
var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
.ToList();
**
回复:更新 5**
builder.Register(context =>
{
var ctx = context.Resolve<IComponentContext>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
foreach (var adapterType in adapterTypes)
consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
});
});
return busControl;
})

我为您的问题提交了一个对我有用的拉取请求,消费者开始时没有任何问题。

如果需要,您可以扩展它以包括所有使用者在其自己的端点上的自动注册。

诀窍是使用正确的泛型接口类型正确注册处理程序类型。有点类型的机制,但它对我有用。

最新更新