使用发布者向RabbitMQ确认,在哪些情况下,发布者会收到成功/失败的通知



引用这本书,深度RabbitMQ:

当发布者收到消息时,将向其发送Basic.Ack请求已发布的已被所有上的消费者应用程序直接消费将消息路由到的队列或将消息排入队列并持久化的队列如果要求。

Has been directly consumed混淆,这是否意味着当消费者将ack发送给代理发布者时,将通知消费者成功处理消息?或者这意味着当消费者刚刚从队列中接收到消息时,发布者将得到通知?

or that the message was enqueued and persisted if requested。这是一种魔术,还是当这两种情况发生时,出版商会被告知?(在这种情况下,出版商将被通知两次)

使用node.jsamqplib想要检查实际发生了什么:

// consumer.js
amqp.connect(...)
.then(connection => connection.createChannel())
.then(() => { assert exchange here })
.then(() => { assert queue here })
.then(() => { bind queue and exchange here })
.then(() => {
channel.consume(QUEUE, (message) => {
console.log('Raw RabbitMQ message received', message)
// Simulate some job to do
setTimeout(() => {
channel.ack(message, false)
}, 5000})
}, { noAck: false })
})
// publisher.js
amqp.connect(...)
.then(connection => connection.createConfirmChannel())
.then(() => { assert exchange here })
.then(() => {
channel.publish(exchange, routingKey, new Buffer(...),{}, (err, ok) => {
if (err) {
console.log('Error from handling confirmation on publisher side', err)
} else {
console.log('From handling confirmation on publisher side', ok)
}
})
})

运行该示例,我可以看到以下日志:

From handling confirmation on publisher side undefined
Raw RabbitMQ message received
Time to ack the message

据我所知,至少通过这个日志,发布者只有在消息排队时才会收到通知(因此,让消费者ack接收消息不会以任何方式影响发布者)

进一步报价:

如果消息无法路由,代理将发送Basic.Nack RPC指示失败的请求。然后由出版商决定决定如何处理消息。

更改上面的示例,其中我只将消息的路由密钥更改为不应在任何地方路由的东西(没有与路由密钥匹配的绑定),从日志中我只能看到下面的。

From handling confirmation on publisher side undefined

现在我更困惑了,这里到底通知了哪个出版商?我理解它会收到一个错误,比如Can't route anywhere,它将与上面的报价对齐。但正如你所看到的,err并没有被定义,作为次要问题,即使amqplib在他们的官方文档中使用(err, ok),在任何情况下我都看不到这些定义。所以这里的输出和上面的例子一样,上面的例子和不可路由的消息有什么不同。

那个么,我在做什么呢,什么时候出版商会收到关于消息发生了什么的通知呢?有没有使用PublisherConfirms的具体例子?从上面的日志记录中,我可以得出结论,在您希望100%确定消息已排队的情况下,使用它是很好的。

经过一次又一次的搜索,我找到了这个http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/

基本规则如下:

  1. 不可路由的强制或即时消息在basic.return之后立即得到确认
  2. 瞬态消息在排队时得到确认
  3. 持久消息在持久化到磁盘或在每个队列上使用时得到确认

如果满足这些条件中的多个,则只有第一个条件会导致确认发送。每个已发布的消息都会尽快得到确认或者稍后,并且没有消息将被确认一次以上。

默认情况下,发布者对消费者一无所知。

PublisherConfirms用于检查消息是否到达代理,但不检查消息是否已入队。

可以使用mandatory标志来确保消息已被路由看看这个https://www.rabbitmq.com/reliability.html

为了确保消息被路由到单个已知队列,生产者可以只声明一个目标队列并直接发布到它。如果消息可能以更复杂的方式路由,但生产者仍然需要知道他们是否到达了至少一个队列,它可以设置basic.publish上的强制标志,确保basic.revert(包含回复代码和一些文本解释)将被发送如果没有适当绑定队列,则返回到客户端。

我不完全确定ack/nack问题的通知,但请查看BunnyBus Node库以获得更简单的api和RabbitMQ管理:)

https://github.com/xogroup/bunnybus

const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus({
user: 'your-user',
vhost: 'your-vhost', // cloudamqp defaults vhost to the username
password: 'your-password',
server: 'your.server.com'
});
const handler = {
'test.event': (message, ack) => {
// Do your work here.
// acknowledge the message off of the bus.
return ack();
}
};
// Create exchange and queue if they do not already exist and then auto connect.
return bunnyBus.subscribe('test', handler)
.then(() => {
return bunnyBus.publish({event: 'test.event', body: 'here's the thing.'});
})
.catch(console.log);