我对消费消息有清楚的了解:http://docs.masstransit-project.com/en/latest/usage/consage/consumer.html
这些实现一次仅处理一条消息。
我需要一次批量处理多个消息。
群众运输现在具有在批处理中处理单个消息的实验功能。
配置您的总线:
_massTransitBus = Bus.Factory.CreateUsingRabbitMq(
cfg =>
{
var host = cfg.Host(new Uri("amqp://@localhost"),
cfg =>
{
cfg.Username("");
cfg.Password("");
});
cfg.ReceiveEndpoint(
host,
"queuename",
e =>
{
e.PrefetchCount = 30;
e.Batch<MySingularEvent>(
ss =>
{
ss.MessageLimit = 30;
ss.TimeLimit = TimeSpan.FromMilliseconds(1000);
ss.Consumer(() => new BatchSingularEventConsumer());
});
});
});
并创建您的消费者:
public class BatchSingularEventConsumer: IConsumer<Batch<MySingularEvent>>
{
public Task Consume(ConsumeContext<Batch<MySingularEvent>> context)
{
Console.WriteLine($"Number of messages consumed {context.Message.Length}");
return Task.CompletedTask;
}
}
您可以使用消息限制和时间限制。
将批量配置我建议阅读克里斯·帕特森(Chris Patterson)有关批处理消息消耗的问题,尤其是有关预取的部分
批处理大小必须小于或等于任何预取计数或并发消息传递限制,以便达到尺寸限制。如果其他限制阻止了批处理大小达到达到批量,则永远不会被调用。
在MassTransit网站上也记录了批次消耗。
事实证明,今天您可以做到这一点:
public class MyConsumer : IConsumer<Batch<MyMessage>>
{
public async Task Consume(ConsumeContext<Batch<MyMessage>> context)
{
...
}
}