我有一个应用程序,它充当rabbitmq生产者。我已经应用了RPC方法,没有问题。生产者发布消息并在 replyQueues(临时队列)中使用其响应。首先,我使用QueueingConsumer进行生产者消费,并且我曾经将超时设置为nextDelivery(timeout)方法。QueueingConsumer 现已弃用,在 RabbitMQ 官方站点中,他们更改了他们的 RPC 教程,并且他们使用了 DefaultConsumer 而不是 QueueingConsumer。我也用 DefaultConsumer 替换了 QueueingConsumer。但是现在有一个问题:如何将超时设置为DefaultConsumer?因为如果消费者没有发送任何响应,垃圾临时队列将保留在代理中。新老生产者消费部分如下。感谢您的帮助。
老生产者消费方式:
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery deliver = consumer.nextDelivery(timeout);
if (deliver.getProperties().getCorrelationId().equals(corrId)) {
response = new String(deliver.getBody(), "UTF-8");
break;
}
}
return response;
新的生产者消费方法:
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
};
channel.basicConsume(replyQueueName, true, consumer);
return response.take();
它解决了。超时可以设置为"响应"对象。"新的生产者消费方法"的变化可能如下:
响应超时:必须使用response.poll(5000, TimeUnit.MILLISECONDS)
而不是response.take()
。