我在Java上使用RabbitMQ(和Celery),这是我根据我正在阅读的教程从RabbitMQ获取消息的代码:
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
但是我只在任务开始时收到一条消息 - 当我想在任务完成时收到一条消息时。有什么帮助吗?
你不应该使用QueueingConsumer,因为它被认为是不推荐使用的,如下所述:https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/QueueingConsumer.html
相反,您应该创建自己的使用者来实现来自 RabbitMQ 库的接口Consumer
。您必须实现一种称为handleDelivery
的方法,每次收到消息时都会调用该方法。然后,要启动它,您需要调用 channel.basicConsume(QUEUE_NAME, true, consumer)
.
例:
channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
//your code here
}
});