RabbitTemplate 的 setChannelDeal 标志导致消息未传递到队列



假设我有AMQP匿名队列和扇出交换的应用程序:

@Bean
public Queue cacheUpdateAnonymousQueue() {
return new AnonymousQueue();
}
public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";
@Bean
FanoutExchange cacheUpdateExchange() {
return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
}
@Bean
Binding cacheUpdateQueueToCacheUpdateExchange() {
return bind(cacheUpdateAnonymousQueue())
.to(cacheUpdateExchange());
}

和Spring集成流程:

@Bean
public IntegrationFlow cacheOutputFlow() {
return from(channelConfig.cacheUpdateOutputChannel())
.transform(objectToJsonTransformer())
.handle(outboundAdapter())
.get();
}

我使用出站适配器:

public MessageHandler outboundAdapter() {
rabbitTemplate.setChannelTransacted(true);
return outboundAdapter(rabbitTemplate)
.exchangeName(CACHE_UPDATE_FANOUT_EXCHANGE)
.get();
}

我可以在日志中看到:

o.s.amqp.rabbit.core.RabbitTemplate: Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,4), conn: Proxy@40976c4b Shared Rabbit Connection: SimpleConnection@1cfaa28d [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56042]
o.s.amqp.rabbit.core.RabbitTemplate: Publishing message on exchange [cache.update.fanout], routingKey = []

但是消息没有被传递到绑定到CCD_ 1交换机的队列。

当我在出站适配器中设置rabbitTemplate.setChannelTransacted(false);时,我可以在日志中看到:

o.s.amqp.rabbit.core.RabbitTemplate      : Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@11a1389d Shared Rabbit Connection: SimpleConnection@444c6abf [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56552]
o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message on exchange [cache.update.fanout], routingKey = []

并将消息传递到队列。

为什么在第一种情况下不能传递消息?

为什么RabbitTemplate没有指明什么?

您的日志具有不同的交换名称;我只是这样测试。。。

@SpringBootApplication
public class So60993877Application {
public static void main(String[] args) {
SpringApplication.run(So60993877Application.class, args);
}
@Bean
public Queue cacheUpdateAnonymousQueue() {
return new AnonymousQueue();
}
public static final String CACHE_UPDATE_FANOUT_EXCHANGE = "cache.update.fanout";
@Bean
FanoutExchange cacheUpdateExchange() {
return new FanoutExchange(CACHE_UPDATE_FANOUT_EXCHANGE);
}
@Bean
Binding cacheUpdateQueueToCacheUpdateExchange() {
return BindingBuilder.bind(cacheUpdateAnonymousQueue())
.to(cacheUpdateExchange());
}
@RabbitListener(queues = "#{cacheUpdateAnonymousQueue.name}")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, 
cacheUpdateAnonymousQueue().getName(), "foo");
template.setChannelTransacted(true);
template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, 
cacheUpdateAnonymousQueue().getName(), "bar");
};
}
}

没有问题。

foo
bar

启用确认和返回:

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
LOG.info("Return: " + message));
template.setConfirmCallback((correlationData, ack, cause) ->
LOG.info("Confirm: " + correlationData + ": " + ack));
return args -> {
template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, cacheUpdateAnonymousQueue().getName(),
"foo", new CorrelationData("foo"));
//          template.setChannelTransacted(true);
template.convertAndSend(CACHE_UPDATE_FANOUT_EXCHANGE, cacheUpdateAnonymousQueue().getName(),
"bar", new CorrelationData("bar"));
template.convertAndSend("missingExchange", cacheUpdateAnonymousQueue().getName(), "baz",
new CorrelationData("baz"));
Thread.sleep(5000);
};
}

最新更新