如何为 ASP.NET 核心服务提供商编写 EasyNetQ 自动订户调度程序?



因此,EasyNetQ 自动订户有一个基本的默认调度程序,它无法使用非无参数构造函数创建消息使用者类。

若要查看实际操作,请创建具有所需依赖项的使用者。您可以设置自己的服务,也可以使用ILogger<T>,它由框架默认值自动注册。

消费短信.cs

public class ConsumeTextMessage : IConsume<TextMessage>
{
private readonly ILogger<ConsumeTextMessage> logger;
public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
{
this.logger = logger;
}
public void Consume(TextMessage message)
{
...
}
}

连接自动订阅者(就何时何地编写/运行此代码而言,这里有一些回旋余地)。

启动.cs

public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
}

(在其他地方,也许是startup.ConfigureBackgroundService)

var subscriber = new AutoSubscriber(bus, "example");
subscriber.Subscribe(Assembly.GetExecutingAssembly());

现在,启动程序并发布一些消息,您应该看到每条消息最终都出现在默认错误队列中。

System.MissingMethodException: No parameterless constructor defined for this object.
at System.RuntimeTypeHandle.CreateInstance(RuntimeType type, Boolean publicOnly, Boolean wrapExceptions, Boolean& canBeCached, RuntimeMethodHandleInternal& ctor)
at System.RuntimeType.CreateInstanceSlow(Boolean publicOnly, Boolean wrapExceptions, Boolean skipCheckThis, Boolean fillCache)
at System.Activator.CreateInstance[T]()
at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.DispatchAsync[TMessage,TAsyncConsumer](TMessage message)
at EasyNetQ.Consumer.HandlerRunner.InvokeUserMessageHandlerInternalAsync(ConsumerExecutionContext context)

我知道我可以提供自己的调度程序,但是我们如何与 ASP.NET Core服务提供商合作;确保它适用于作用域服务?

所以,这就是我想出的。

public class MessageDispatcher : IAutoSubscriberMessageDispatcher
{
private readonly IServiceProvider provider;
public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}
public void Dispatch<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsume<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}
}
public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}
}

几个值得注意的点...

IServiceProvider依赖项是 ASP.NET 核心 DI 容器。起初可能不清楚,因为在整个Startup.ConfigureServices()中,您正在使用其他接口IServiceCollection.

public MessageDispatcher(IServiceProvider provider)
{
this.provider = provider;
}

为了解析作用域内服务,需要围绕创建和使用使用者创建和管理作用域的生命周期。我正在使用GetRequiredService<T>扩展方法,因为我真的想要一个讨厌的异常,而不是一个可能在我们注意到它之前泄漏一段时间的空引用(以空引用异常的形式)。

using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
consumer.Consume(message);
}

如果仅直接使用provider,如provider.GetRequiredService<T>()中所述,则在尝试解析作用域使用者或使用者的作用域依赖项时会看到这样的错误。

引发异常:"System.InvalidOperationException"Microsoft.Extensions.DependencyInjection.dll:"无法从根提供程序解析作用域服务'Example.Messages.ConsumptionTextMessage'。

为了解析作用域服务并为异步使用者正确维护生命周期,您需要在正确的位置获取 async/await 关键字。应等待ConsumeAsync调用,这要求该方法是异步的。在等待行和使用者中使用断点,并逐行分步,以更好地处理此问题!

public async Task DispatchAsync<TMessage, TConsumer>(TMessage message)
where TMessage : class
where TConsumer : class, IConsumeAsync<TMessage>
{
using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}
}

好的,现在我们有了调度程序,我们只需要在 Startup 中正确设置所有内容。我们需要从提供程序解析调度程序,以便提供程序可以正确提供自身。这只是执行此操作的一种方式。

启动.cs

public void ConfigureServices(IServiceCollection services)
{
// messaging
services.AddSingleton<IBus>(RabbitHutch.CreateBus("host=localhost"));
services.AddSingleton<MessageDispatcher>();
services.AddSingleton<AutoSubscriber>(provider =>
{
var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "example")
{
AutoSubscriberMessageDispatcher = provider.GetRequiredService<MessageDispatcher>();
}
});
// message handlers
services.AddScoped<ConsumeTextMessage>();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
app.ApplicationServices.GetRequiredServices<AutoSubscriber>().SubscribeAsync(Assembly.GetExecutingAssembly());
}

我从上面的回复中复制了代码,但它对我不起作用。它会导致注入的数据库上下文(作用域)重用同一实例时出现问题。

只是一个小小的变化:

而不是:

using(var scope = provider.CreateScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}

在 MessageDispatcher 中,我需要使用CreateAsyncScope 来实现 DispatchAsync方法:

using(var scope = provider.CreateAsyncScope())
{
var consumer = scope.ServiceProvider.GetRequiredService<TConsumer>();
await consumer.ConsumeAsync(message);
}

最新更新