如何使用ConnectionListner和/或ChannelListner在RabbitMQ中记录消息传递的失败/成功



我试图记录在RabbitMQ中发送消息期间发生的任何信息或异常,为此我试图在现有连接工厂上添加ConnectionListener。

kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
System.out.println("Connection Created");
}
@Override
public void onShutDown(ShutdownSignalException signal) {
System.out.println("Connection Shutdown "+signal.getMessage());
}
});
kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       

为了测试异常场景,我从RabbitMQ控制台取消绑定甚至删除了队列。但我没有得到任何异常或任何关闭方法调用。

虽然,当我停止RabbitMQ服务时,我得到了

Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

但这个例外并不是来自我添加的监听器。

我想知道

  1. 为什么我没有从shutdown方法得到任何异常或调用
  2. 如何使用ConnectionListner和/或ChannelListner记录消息传递的失败/成功
  3. 我们可以使用AMQP附加程序吗?如果可以,我们该怎么做?(任何示例/教程(
  4. 确保信息发送的其他方法是什么

注意:我不想使用发布者确认方法。

Connection Refused不是ShutdownSignalException-由于服务器/端口上不存在代理,因此从未建立连接。

您不能使用侦听器来确认单个消息的传递或返回;使用publisher确认并返回。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-是异步

有关如何使用附加程序,请参阅文档。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging

编辑

要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。

下面是一个示例,展示了如何:

@SpringBootApplication
public class So66882099Application {
private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
public static void main(String[] args) {
SpringApplication.run(So66882099Application.class, args);
}
@RabbitListener(queues = "foo")
void listen(String in) {
}
// consumer side listeners for no connection
@EventListener
void consumerFailed(ListenerContainerConsumerFailedEvent event) {
log.error(event + " via event listener");
if (event.getThrowable() instanceof AmqpConnectException) {
log.error("Broker down?");
}
}
// or
@Bean
ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
return event -> log.error(event + " via application listener");
}
// producer side - use a RetryListener
@Bean
RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
RetryTemplate retry = new RetryTemplate();
// configure retries here as needed
retry.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
log.error("Send failed " + throwable.getMessage());
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
}
});
rabbitTemplate.setRetryTemplate(retry);
return rabbitTemplate;
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
try {
template.convertAndSend("foo", "bar");
}
catch (Exception e) {
e.printStackTrace();
}
};
}
}

最新更新