我有一个生产者,每当通过 API 调用收到消息时,它都会排队,并且我只想在确认消息被代理接收时才返回。
我通过出版商确认发现了如何做到这一点 -
using (var connection = factory.CreateConnection())
{
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeName, "topic", true, false, null);
//This enables producer confirm
channel.ConfirmSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, topic, properties, body);
channel.WaitForConfirms();
Console.WriteLine("I sent a message !", message);
}
}
我的问题是我不想等待所有确认,只想等待与此特定消息相关的确认。我不想将其限制为单个线程/工作线程,也不想等待所有内容都确认。
js 的 rabbit 库使用一个非常适合我使用的回调 - 但 C# 版本似乎不支持它。
我的问题是我不想等待所有确认,只想等待与此特定消息相关的确认。
您应该订阅BasicAcks
回调,并使用它来将确认与您发布的消息相关联。
我不想将其限制为单个线程/工作线程
您可以在线程之间共享该连接,但您必须创建一个每线程IModel
实例。
注意:RabbitMQ 团队监控rabbitmq-users
邮件列表,并且只偶尔回答 StackOverflow 上的问题。
您需要的是发布者确认和事务。
将要确认的消息分成单个(或子集(事务。
交易:
ch.txSelect(); <-- start transaction
ch.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_BASIC,
"nop".getBytes());
ch.txCommit();<--commit transaction
可以使用流式处理轻量级发布者确认,使用:
ch.setConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
unconfirmedSet.headSet(seqNo+1).clear();
} else {
unconfirmedSet.remove(seqNo);
}
}
public void handleNack(long seqNo, boolean multiple) {
// handle the lost messages somehow
}
我希望它有所帮助