MassTransit RabbitMq调度消息未发送给消费者



我想使用MassTransit+RabbitMq总线来调度来自总线的消息。我编写了两个C#控制台应用程序,一个用于消息创建者并将消息发送到scheduler,另一个用于信息使用者。

下面的代码用于在总线中进行调度,以便每秒向调度器发送一条消息,然后调度器以10秒的延迟向消费者发送消息。我的问题是没有消息发送到rabbitMq客户端中的使用者或使用者队列。我的错误在哪里?

注意:UseInMemoryScheduler工作正常,但UseMessageScheduler不工作

总线消息创建者

class Program
{
public static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.ReadKey();
}
static async Task MainAsync(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Username("guest");
settings.Password("guest");
});
//rabbit.UseInMemoryScheduler(); // This works
rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));// This doesn't work,
});
busControl.Start();
var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));
for (int i = 0; i < 1000000; i++)
{
await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"), 
DateTime.Now.AddSeconds(10), 
new MessageCreated()
{
Text = $"message {i}"
});
Thread.Sleep(1000);
}
Console.ReadKey();
busControl.Stop();
}
}

消息消费者。

class Program
{
static void Main(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
rabbit.ReceiveEndpoint(host, "publisher", conf =>
{
conf.Consumer<Consumer>();
});
});
busControl.Start();
Console.ReadKey();
busControl.Stop();
}
}
public class Consumer : IConsumer<MessageCreated>
{
public Task Consume(ConsumeContext<MessageCreated> context)
{
MessageCreated message = context.Message;
Console.WriteLine(message.Text);
context.Publish(new MessagePublished
{
Text = message.Text,
});
return Task.FromResult(context.Message);
}
}

更新根据@maldworth的回答,我更改了以下代码。但问题并没有解决。

class Program
{
public static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
Console.ReadKey();
}
private static async Task<IScheduler> CreateSchedulerAsync()
{
var schedulerFactory = new StdSchedulerFactory();
var scheduler = await schedulerFactory.GetScheduler();
return scheduler;
}
static async Task MainAsync(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(async cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost:5672"), settings =>
{
settings.Password("guest");
settings.Username("guest");
});
var scheduler = await CreateSchedulerAsync();
cfg.ReceiveEndpoint("quartz", e =>
{
cfg.UseMessageScheduler(e.InputAddress);
e.Consumer(() => new ScheduleMessageConsumer(scheduler));
e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});
cfg.ReceiveEndpoint(host, "publisher", conf =>
{
conf.Consumer<PublisherConsumer>();
});
cfg.ReceiveEndpoint(host, "subscriber", conf =>
{
conf.Consumer<SubscriberConsumer>();
});
});
busControl.Start();
for (int i = 0; i < 1000000; i++)
{
var text = $"message {i}";
Console.WriteLine($"Schedule: {text}");
await busControl.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),
DateTime.Now.AddSeconds(30),
new ScheduleMessage()
{
Text = text
});
Thread.Sleep(10000);
}
Console.ReadKey();
busControl.Stop();
}
}
public class PublisherConsumer : IConsumer<ScheduleMessage>
{
public Task Consume(ConsumeContext<ScheduleMessage> context)
{
Console.WriteLine($"In Publisher: {context.Message.Text}");
context.Publish(new PublishMessage
{
Text = context.Message.Text,
});
return Task.FromResult(context.Message);
}
}
public class SubscriberConsumer : IConsumer<PublishMessage>
{
public Task Consume(ConsumeContext<PublishMessage> context)
{
Console.WriteLine($"In Subscriber: {context.Message.Text}");
return Task.FromResult(context.Message);
}
}

App.config文件内容为:

<configSections>
<section name="quartz" type="System.Configuration.NameValueSectionHandler, System, Version=1.0.5000.0,Culture=neutral, PublicKeyToken=b77a5c561934e089" />
</configSections>
<startup> 
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.2" />
</startup>
<quartz>
<add key="quartz.scheduler.instanceName" value="MassTransit-Quartz" />
<add key="quartz.scheduler.instanceId" value="AUTO" />
<add key="quartz.threadPool.type" value="Quartz.Simpl.SimpleThreadPool, Quartz" />
<add key="quartz.threadPool.threadCount" value="4" />
<add key="quartz.jobStore.misfireThreshold" value="60000" />
<add key="quartz.serializer.type" value="binary" />
<add key="quartz.jobStore.type" value="Quartz.Impl.AdoJobStore.JobStoreTX, Quartz" />
<add key="quartz.jobStore.useProperties" value="false" />
<add key="quartz.jobStore.driverDelegateType" value="Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz" />
<add key="quartz.jobStore.clustered" value="true" />
<add key="quartz.jobStore.tablePrefix" value="QRTZ_" />
<add key="quartz.jobStore.dataSource" value="quartzDS" />
<add key="quartz.dataSource.quartzDS.connectionString" value="Server=.;Database=QuartzDB;Integrated Security=SSPI" />
<add key="quartz.dataSource.quartzDS.provider" value="SqlServer" />
</quartz>

您就快到了,只需更新以下逻辑即可:

busControl.Start();
scheduler.JobFactory = new MassTransitJobFactory(busControl);
scheduler.Start().Wait();
Console.ReadKey();
busControl.Stop();

所以您没有提到是否有第三个消费者在运行(在第三个控制台应用程序中(。为了安排使用Quartz,您需要专门针对Quartz的第三消费者。它必须运行,在这种情况下,石英接收端点将侦听"石英"队列。

[更新]

以下是您想要的第三控制台应用程序(石英服务(的配置示例:

var scheduler = CreateScheduler();
configurator.ReceiveEndpoint("quartz", e =>
{
configurator.UseMessageScheduler(e.InputAddress);
e.Consumer(() => new ScheduleMessageConsumer(scheduler));
e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});
...
private static IScheduler CreateScheduler()
{
ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
var scheduler = schedulerFactory.GetScheduler();
return scheduler;
}

您还需要为我们配置石英存储(如果您想在内存中进行测试,请使用SQLite、MSSql、RAM(。请参阅此处的配置示例。

[/Updated]

[Updated2]

有人在群里发布了类似的问题。幸运的是,他们提供了一个带有一堆不同MT功能的示例github,其中之一就是单独的调度器。请看一看,那里应该有你需要的一切。

[Updated2]

如果你想在不运行全吹石英调度器的情况下进行测试,那么你可以使用InMemory调度器。但这只是为了测试目的,您不应该在生产中使用它。

此外,在您的第一个代码片段中,您不需要获得调度程序的发送端点:var sendEndpoint = await busControl.GetSendEndpoint(new Uri("rabbitmq://localhost/quartz"));

因为在总线配置中,rabbit.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));表示无论何时调用ScheduleSend(来自ConsumerContext或IBus/IBusControl(,它都将始终使用该/quartz地址。

最后,这行await sendEndpoint.ScheduleSend(new Uri("rabbitmq://localhost/publisher"),,然后可以更改为busControl.ScheduleSend(...)

最新更新