兔子MQ直接回复。我收到已关闭的异常



已解决:移动

channel.basicPublish(",QUEUE,props,message.getBytes());

低于

channel.basicConsume(replyQueue,…)

这解决了问题。


我正试图弄清楚如何使用RabbitMQ直接回复功能。由于文档中对如何实现它相当模糊,所以我尝试使用RPC示例,采用它来使用直接回复。

private final static String QUEUE = "Test_chan";
private void directReplyToClient(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//replyQueue = channel.queueDeclare().getQueue();
replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";
channel.basicPublish("", QUEUE, props, message.getBytes());
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
response.offer(new String(body, "UTF-8"));
}
});
System.out.println(response.take());
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}

正在将回复地址设置为

channel.queueDeclare().getQueue()

有效,但将其设置为

amq.rabitmq.reply到

给出以下异常:

线程"main"com.rabbitmq.client.AlreadyClosedException中的异常:由于通道错误,通道已关闭;协议方法:方法(回复代码=406,回复文本=PRECONDITION_FAILED-快速回复使用者不存在,类id=60,方法id=40)

有人看到我哪里做错了吗?任何建议都将不胜感激。

这就是解决方案的代码。在发布之前执行消耗。

private final static String QUEUE = "Test_chan";
private void directReplyToProducer(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;
try {
connection = factory.newConnection();
channel = connection.createChannel();
replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
System.out.println(" [x] Sent x'" + message + "'");
channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
response.offer(new String(body, "UTF-8"));
}
});
channel.basicPublish("", QUEUE, props, message.getBytes());
System.out.println(response.take());
Thread.sleep(10000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}

最新更新