如何让RabbitMQ重新传递未确认的消息



我严格遵循以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html.

我这样启动RabbitMQ服务器:

docker pull rabbitmq
docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3

来自教程:

使用此代码,我们可以确保即使使用CTRL+C在处理消息时,不会丢失任何内容。很快工作人员死亡后,所有未确认的消息都将重新发送。

我生成了两个消费者,当我按住其中一个时,另一个运行的消费者不会接收最初发送给前一个消费者的消息。在CTRL+C’ing之后,我如何让其中一个消费者重新发送消息?

编辑:我现在正在通过"brew"安装RabbitMQ,但我仍然看到同样的问题。

brew update
brew install rabbitmq
/usr/local/sbin/rabbitmq-server &

没有必要在使用者代码中放入睡眠或类似的东西。在您提供的链接上,搜索以手动消息确认开头的段落,然后查看那里的代码。关键是不要确认信息。如果您将autoACK标志设置为true,那么您可以调用任何想要的消息,消息一收到就会得到确认。因此,只需不设置该标志,而且为了进行测试,您可以注释掉channel.basicAck(envelope.getDeliveryTag(), false);行,以便也不进行手动ACK。因此,当使用者退出时,消息仍将在队列中。

奇怪的是,RabbitMQ开箱即用。

第1步,我启动了RabbitMQ:

$ docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                   NAMES
e0c3257b8b49        rabbitmq:3          "docker-entrypoint.s…"   18 minutes ago      Up 14 minutes       4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp   my-rabbit

步骤2,我发布了一条消息(顺便说一句,我尝试使用Node.js。源代码见下面的附录):

$ node src/producer.js
Publisher:  TODO 1st

步骤3,我相继启动了两个消费者(出于测试目的,我的消费者被设计为不进行确认,因此RabbitMQ永远不会将消息出列)。

消费者1将收到该消息,而消费者2不会。

消费者1:

$ node src/consumer.js 
Consumer:  TODO 1st

消费者2:

$ node src/consumer.js 

步骤4,当我通过"Ctrl+c"停止使用者1时,使用者2将立即收到来自RabbitMQ:的消息

消费者2:

$ node src/consumer.js 
Consumer:  TODO 1st

结论:基本上,在设置使用者时,我们需要告诉RabbitMQ在收到使用者的确认之前不要将消息出列。因此,如果使用者1在有机会确认消息之前因任何原因被停止,RabbitMQ将重新向使用者2传递消息。

附录

src/producer.js

var q = 'tasks'
function bail (err) {
console.error(err)
process.exit(1)
}
// Publisher
function publisher (conn) {
conn.createChannel(onOpen)
function onOpen (err, ch) {
if (err != null) bail(err)
ch.assertQueue(q)
const msg = 'TODO 1st'
ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
console.log('Publisher: ', msg)
}
}
require('amqplib/callback_api')
.connect('amqp://guest:guest@localhost', function (err, conn) {
if (err != null) bail(err)
publisher(conn)
})

src/consumer.js

var q = 'tasks'
function bail (err) {
console.error(err)
process.exit(1)
}
// Consumer
function consumer (conn) {
conn.createChannel(onOpen)
function onOpen (err, ch) {
if (err != null) bail(err)
ch.assertQueue(q)
ch.consume(q, function (msg) {
if (msg !== null) {
console.log('Consumer: ', msg.content.toString())
// Commented out the line below, so RabbitMQ never dequeues a message
// ch.ack(msg)
}
}, { noAck: false })
}
}
require('amqplib/callback_api')
.connect('amqp://guest:guest@localhost', function (err, conn) {
if (err != null) bail(err)
consumer(conn)
})

最新更新