如何设置春季AMQP发布者确认并返回



我是春季amqp的新人。当配置弹簧amqp发布者确认并返回时,遇到了问题。

AMQP 配置:

SimpleMessageListenerContainer container(CachingConnectionFactory connectionFactory, @Qualifier("topicListenerAdapter")MessageListenerAdapter listenerAdapter) {
    connectionFactory.setChannelCacheSize(5);
    connectionFactory.setPublisherConfirms(true);
    connectionFactory.setPublisherReturns(true);
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("request.queue","reply.queue");
    container.setMessageConverter(json2MessageConverter());
    container.setReceiveTimeout(3000);
    container.setMessageListener(listenerAdapter);
    return container;
}

发送消息:

rabbitTemplate.convertAndSend("spring-boots5", message);
        rabbitTemplate.setConfirmCallback(new ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                // TODO Auto-generated method stub
                System.out.println("confirm correlationData is : "+correlationData+"ack is : "+
                ack);
            }
        });
        rabbitTemplate.setMandatory(true);

运行此应用程序时,amqp 收到消息:

Body:'This is my first message'MessageProperties [headers={bar=baz}, timestamp=null, messageId=123456, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=request.queue, deliveryTag=1, messageCount=0]) >

遇到 en 错误:

22 20:48:08.661[0;39m [31mERROR[0;39m [35m37792[0;39m [2m---[0;39m [2m[ 127.0.0.1:5672][0;39m [36mo.s.a.r.s.PublisherCallbackChannelImpl  [0;39m [2m:[0;39m No listener for seq:1

控制台上没有期望的字符串:

"确认相关性数据

是:"+相关性数据+"ack 是:"+ 阿克

并且不知道如何配置回复消息(我使用java配置)

您需要在发送消息之前设置ConfirmCallback(和ReturnCallback)。

您还需要在发送时提供一些相关数据,以便确定确认适用于哪条出站消息。

查看此测试用例...

@Test
public void testPublisherConfirmWithSendAndReceive() throws Exception {
    final CountDownLatch latch = new CountDownLatch(1);
    final AtomicReference<CorrelationData> confirmCD = new AtomicReference<CorrelationData>();
    templateWithConfirmsEnabled.setConfirmCallback(new ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            confirmCD.set(correlationData);
            latch.countDown();
        }
    });
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
    container.setQueueNames(ROUTE);
    container.setMessageListener(new MessageListenerAdapter(new Object() {
        @SuppressWarnings("unused")
        public String handleMessage(String in) {
            return in.toUpperCase();
        }
    }));
    container.start();
    CorrelationData correlationData = new CorrelationData("abc");
    String result = (String) this.templateWithConfirmsEnabled.convertSendAndReceive(ROUTE, (Object) "message", correlationData);
    container.stop();
    assertEquals("MESSAGE", result);
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertEquals(correlationData, confirmCD.get());
}

最新更新