目前正在将一个项目移植到。net 5的过程中,我一直在与MassTransit和Quartz调度器配置作斗争。
未安排的消息工作正常,但安排的消息工作不正常。
我能够在Quartz DB中看到计划的作业,但是当一个作业被自动触发时,我得到以下错误:
Quartz.SchedulerException: Problem instantiating class 'MassTransit.QuartzIntegration.ScheduledMessageJob: Cannot instantiate type which has no empty constructor (Parameter 'ScheduledMessageJob')'
在这个项目中有
- 一个web应用程序,用来发送消息和预定消息。
- 一个windows服务(后台服务),应该消耗消息,也发送消息(非预定的)。
网站
在startup.cs中我有:
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
services.AddSingleton(scheduler);
services.AddMassTransit(x =>
{
x.AddMessageScheduler(busSettings.QuartzQueueAddress);
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
非预定消息以这种方式发送:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
var target = await _bus.GetSendEndpoint(_busSettings.ReminderRequestQueueAddress);
await target.Send(message);
而一个预定的是这样发送的:
var message = new BuildReminderDeploySetRequest(Guid.NewGuid(), ... other parameters...);
await _scheduler.ScheduleSend(_busSettings.ReminderRequestQueueAddress, reminderDate, message);
分别通过DI注射:ibuscontrol_bus用于非计划消息imessagesscheduler _scheduler用于调度消息。
在Program.cs中我有:
services.AddTransient<ProcessBuildReminderDeploySetConsumer>();
NameValueCollection properties = new();
properties.Add("quartz.scheduler.instanceName", "QuartzSchedulerInstance1");
properties.Add("quartz.scheduler.instanceId", "Quartz_instance_1");
properties.Add("quartz.threadPool.type", "Quartz.Simpl.SimpleThreadPool, Quartz");
properties.Add("quartz.threadPool.threadCount", "10");
properties.Add("quartz.threadPool.threadPriority", "2");
properties.Add("quartz.jobStore.misfireThreshold", "60000");
properties.Add("quartz.jobStore.type", "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz");
properties.Add("quartz.jobStore.useProperties", "true");
properties.Add("quartz.jobStore.dataSource", "default");
properties.Add("quartz.jobStore.tablePrefix", "QRTZ_");
properties.Add("quartz.jobStore.lockHandler.type", "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz");
properties.Add("quartz.dataSource.default.connectionString", "Server=(local);Database=Quartz;Trusted_Connection=True;");
properties.Add("quartz.dataSource.default.provider", "SqlServer");
properties.Add("quartz.serializer.type", "json");
StdSchedulerFactory factory = new StdSchedulerFactory(properties);
var scheduler = factory.GetScheduler().Result;
scheduler.Start();
services.AddSingleton(scheduler);
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ReceiveEndpoint(busSettings.QuartzQueueName, epc =>
{
var partitioner = cfg.CreatePartitioner(16);
epc.Consumer(() =>
new ScheduleMessageConsumer(scheduler),
x => x.Message<ScheduleMessage>(
m => m.UsePartitioner(
partitioner,
p => p.Message.CorrelationId)
)
);
epc.Consumer(
() => new CancelScheduledMessageConsumer(scheduler),
x => x.Message<CancelScheduledMessage>(
m => m.UsePartitioner(partitioner, p => p.Message.TokenId)
)
);
cfg.UseMessageScheduler(epc.InputAddress);
cfg.Durable = true;
// THIS USED TO BE IN THE OLD .NET 4.6 PROJECT
// BUT THOSE CLASSES DO NOT EXIST ANYMORE
//var specification =
//new SchedulerBusFactorySpecification(scheduler, epc.InputAddress);
//cfg.AddBusFactorySpecification(specification);
});
cfg.ReceiveEndpoint(busSettings.ReminderRequestQueueName, e =>
{
// Enforce loading only 1 message at a time for now
e.PrefetchCount = 1;
e.UseMessageRetry(r =>
{
r.Interval(retryCount: 5, interval: TimeSpan.FromMilliseconds(500));
});
e.Consumer<ProcessBuildReminderDeploySetConsumer>(context);
});
cfg.UseMessageScheduler(busSettings.QuartzQueueAddress);
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
它对直接发送到队列的消息没有任何问题。关于计划消息,由于它们没有被Quartz正确触发,因此它无法使用它们。
我确信可能有一些配置问题导致这个错误,但是在不同版本之间配置+更改的各种方式之间,我无法找出这个问题。
对于当前版本的Quartz,有用于配置服务集合的扩展方法。这些方法将ISchedulerFactory
添加到容器中。
然后,在您想要托管石英的过程中,您可以在配置中使用非明显扩展方法在MassTransit中配置它:
services.AddMassTransit(conf =>
{
conf.AddMessageScheduler(busSettings.QuartzQueueAddress);
conf.UsingRabbitMq((context, cfg) =>
{
cfg.Host(busSettings.RabbitMQAddress, host =>
{
host.Username(busSettings.RabbitMQLogin);
host.Password(busSettings.RabbitMQPassword);
});
cfg.UseInMemoryScheduler(context, busSettings.QuartzQueueName)
这将使用Quartz配置的调度器工厂(显然,您将其配置为不在内存中)。它还配置了一个总线观察者,在总线启动/停止时自动启动/停止石英。