Masstransit RPC (RabbitMq) 吞吐量限制



我们正在将 Masstransit 与 RabbitMq 结合使用,将 RPC 从我们系统的一个组件制作到其他组件。

最近,我们面临着客户端吞吐量的限制,每秒测量了大约 80 个已完成的响应。

在尝试调查问题所在时,我发现 RPC 服务器快速处理请求,然后将响应放入回调队列,然后队列处理速度为 80 M\s

此限制仅在客户端。在同一台计算机上启动同一客户端应用程序的另一个进程会使服务器端的请求吞吐量翻倍,但随后我看到两个充满消息的回调队列正在以相同的 80 M \s 消耗

我们正在使用IBus的单个实例

builder.Register(c =>
{
var busSettings = c.Resolve<RabbitSettings>();
var busControl = MassTransitBus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(busSettings.Host), h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.UseSerilog();
cfg.Send<IProcessorContext>(x =>
{
x.UseCorrelationId(context => context.Scope.CommandContext.CommandId);
});
}
);
return busControl;
})
.As<IBusControl>()
.As<IBus>()
.SingleInstance();

发送逻辑如下所示:

var busResponse = await _bus.Request<TRequest, TResult>(
destinationAddress: _settings.Host.GetServiceUrl<TCommand>(queueType),
message: commandContext,
cancellationToken: default(CancellationToken),
timeout: TimeSpan.FromSeconds(_settings.Timeout),
callback: p => { p.WithPriority(priority); });

有没有人遇到过这样的问题? 我猜响应调度逻辑中存在一些程序限制。它可能是最大线程池大小或缓冲区的大小,也是响应队列的预取计数。 我尝试使用 .Net 线程池大小,但没有任何帮助。

我是公共交通的新手,将不胜感激任何关于我的问题的帮助。 希望可以通过配置方式修复

您可以尝试一些方法来优化性能。我还建议查看 MassTransit-Benchmark 并在您的环境中运行它 - 这将使您了解代理的可能吞吐量。它允许您调整预取计数、并发等设置,以查看它们如何影响您的结果。

此外,我建议使用其中一个请求客户端来减少每个请求/响应的设置。例如,创建一次请求客户端,然后对每个请求使用相同的客户端。

var serviceUrl = yourMethodToGetIt<TRequest>(...); var client = Bus.CreateRequestClient<TRequest>(serviceUrl);

然后,在需要执行请求时使用该IRequestClient<TRequest>实例。

Response<Value> response = await client.GetResponse<TResponse>(new Request());

由于您只是使用 RPC,因此我强烈建议将接收终结点队列设置为非持久,以避免将 RPC 请求写入磁盘。并将总线预取计数调整为更高的值(比您可能拥有的最大并发请求数高 2 倍(,以确保响应始终直接传递给等待的响应使用者(这是 RabbitMQ 传递消息的内部因素(。

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.PrefetchCount = 1000; }

最新更新