如何使用为使用者使用预取计数.NET RabbitMQ客户端



它在RabbitMQ文档中声明如下

"根据经验,在线程之间共享通道实例是需要避免的事情。应用程序应该更喜欢使用通道每个线程,而不是在多个线程之间共享同一个通道螺纹。">

目前,我们正在研究预取计数,建议如果您有少量的使用者,并且autoack=false,那么我们应该同时使用多条消息。然而,我们发现,如果消费者使用单个执行线程发回手动确认,则预取不会起作用。然而,如果我们将使用者处理封装在Task中,我们会发现预取计数确实很重要,并大大提高了使用者的性能。

请参阅以下示例,其中我们将消费者对消息的消费包装在Task对象中:

class Program
{
public static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "172.20.20.13",
UserName = "billy",
Password = "guest",
Port = 5671,
VirtualHost = "/",
Ssl = new SslOption
{
Enabled = true,
ServerName = "rabbit.blah.com",
Version = System.Security.Authentication.SslProtocols.Tls12
}
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicQos(0, 100, false);
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var _result = new Task(() => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
System.Threading.Thread.Sleep(80);
channel.BasicAck(ea.DeliveryTag, false);
});
_result.Start();
};
channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}

我的问题是人们如何用实现消费者。NET rabbitmq客户端的预取计数?,你必须使用某种任务手动确认吗?,安全吗

您所参考的文档是针对Java客户端的。您应该参考此文档。

您使用的是的最新版本。NET客户端(5.1),因此在Received事件处理程序中进行工作不会阻塞其他处理TCP数据的线程,也不会阻塞心跳-这两种情况都很好。

首先,调用channel.BasicQos(0, 1, false)意味着您的消费者一次只能从RabbitMQ收到一条就绪消息,在调用BasicAck之前,不会传递另一条消息。所以,真的没有理由在另一个线程中做你的工作,因为你无论如何都不会收到另一条消息。

如果你增加了预取值(通过实验和运行基准测试),如果你的工作需要几毫秒以上的时间,你就必须在后台线程中完成你的工作。

当您在Received事件回调中进行工作时,它将阻塞用于进行回调的线程,因为回调不是在它自己的线程上执行的。因此,您可以确保您的工作非常短,也可以在另一个线程中完成工作。

我只是花了一些时间复习。NET客户端代码,我确信IModel实例不是线程安全的。如果增加预取,您将有机会同时确认多条消息,因此我建议实施一种使用该方法的解决方案,并确保在创建连接的同一线程上调用BasicAck


注意:RabbitMQ团队监控rabbitmq-users邮件列表,有时只回答StackOverflow上的问题

来源:https://www.rabbitmq.com/api-guide.html

当使用手动确认时,重要的是要考虑哪个线程进行确认。如果它与收到交付的线程(例如消费者#handleDelivery将传递处理委托给另一个线程),用确认将多个参数设置为true是不安全的,将导致双重确认,因此是信道级协议关闭通道的异常。在时间是安全的。

channel.basicAck(tag, false)是线程安全

但CCD_ 10不是。

RabbitMQ和通道Java线程安全中也提到了一些优点

最新更新