您能否在MassTransit中将作用域过滤器与批处理消费者一起使用



我试图对批处理使用者使用作用域过滤器,但在启动时遇到了异常。我对批量消费者和过滤器进行了隔离测试,它们运行良好。

主要方法:

static async Task Main(string[] args)
{
var host = new HostBuilder()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureContainer<ContainerBuilder>((hostBuilderContext, builder) =>
{                    
// scoped filter
builder.RegisterGeneric(typeof(MyFilter<>)).InstancePerLifetimeScope();                    
builder.AddMassTransit(configurator =>
{                                                
configurator.UsingRabbitMq((context, cfg) =>
{
cfg.Durable = true;
cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});

cfg.UseConsumeFilter(typeof(MyFilter<>), context);                            

cfg.ReceiveEndpoint("hello-queue", endpointConfigurator =>
{                      
endpointConfigurator.Batch<SayHello>(b =>
{
b.MessageLimit = 100;
b.ConcurrencyLimit = 10;
b.TimeLimit = TimeSpan.FromSeconds(1);
b.Consumer(() => new BatchConsumer());                                    
});
});
});
});
})
.ConfigureServices((hostContext, services) =>
{                    
services.AddHostedService<HostedService>();
});
await host.Build().RunAsync();
}

例外:

Autofac.Core.DependencyResolutionException:'激活λ时引发异常:Microsoft.Extensions.HostedService[]->CommandProcessor.HostedService->λ: MassTransit.IBusControl->λ: MassTransit.Registration.IBusInstance.'

堆栈跟踪:

at Autofac.Core.Resolving.InstanceLookup.CreateInstance(IEnumerable`1 parameters)
at Autofac.Core.Resolving.InstanceLookup.Execute()
at Autofac.Core.Resolving.ResolveOperation.GetOrCreateInstance(ISharingLifetimeScope currentOperationScope, ResolveRequest request)
at Autofac.Core.Resolving.ResolveOperation.ResolveComponent(ResolveRequest request)
at Autofac.Core.Resolving.ResolveOperation.Execute(ResolveRequest request)
at Autofac.Core.Lifetime.LifetimeScope.ResolveComponent(ResolveRequest request)
at Autofac.ResolutionExtensions.TryResolveService(IComponentContext context, Service service, IEnumerable`1 parameters, Object& instance)
at Autofac.ResolutionExtensions.ResolveOptionalService(IComponentContext context, Service service, IEnumerable`1 parameters)
at Autofac.ResolutionExtensions.ResolveOptional(IComponentContext context, Type serviceType, IEnumerable`1 parameters)
at Autofac.ResolutionExtensions.ResolveOptional(IComponentContext context, Type serviceType)
at Autofac.Extensions.DependencyInjection.AutofacServiceProvider.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetService[T](IServiceProvider provider)
at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>d__9.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.<RunAsync>d__4.MoveNext()
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at CommandProcessor.Program.<Main>d__0.MoveNext() 

我清理了您的代码以使用正确的批处理配置,从容器中解析使用者,并正确配置接收端点。

static async Task Main(string[] args)
{
var host = new HostBuilder()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureContainer<ContainerBuilder>((hostBuilderContext, builder) =>
{
// scoped filter
builder.RegisterGeneric(typeof(MyFilter<>)).InstancePerLifetimeScope();

builder.AddMassTransit(configurator =>
{
configurator.AddConsumer<BatchConsumer, BatchConsumerDefinition>();
configurator.UsingRabbitMq((context, cfg) =>
{
cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("hello-queue", endpointConfigurator =>
{
endpointConfigurator.UseConsumeFilter(typeof(MyFilter<>), context);
endpointConfigurator.ConfigureConsumer<BatchConsumer>(context);
});
});
});
})
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<HostedService>();
});
await host.Build().RunAsync();
}
class BatchConsumerDefinition :
ConsumerDefinition<BatchConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<BatchConsumer> consumerConfigurator)
{
consumerConfigurator.Options<BatchOptions>(options => options
.SetMessageLimit(100)
.SetTimeLimit(1000)
.SetConcurrencyLimit(10));
}
}

据我所知,作用域过滤器应该可以工作。问题是,您需要每个消息或每个批次的作用域,这可能需要将其直接移动到Batch<T>的使用者配置器。

此外,使用者定义用于指定批处理选项(您使用的是遗留语法,这是不推荐的(。

最新更新