问题处理RabbitMq侦听器引发的异常



我的项目中有这个监听器:

@Service
@RequiredArgsConstructor
@Slf4j
public class ConsumerService {
@RabbitListener(queues = "${queue.treatment.request}")
public void handleQueueTreatmentRequestMessageReception(AppointmentPayloadDTO myAppointment) {
log.info(" ============================  Message received in queue-treatment-plan-newn: " + myAppointment);
log.info(" ============================  Creating new treatment plan ....");
}
}

这个错误处理程序:

@Configuration
public class RabbitMQErrorHandler implements ErrorHandler
{
@Override
public void handleError(Throwable t) {
System.out.println("======================================================================================");
System.out.println("error occurred in message listener and handled in error handler" + t.toString());
System.out.println("======================================================================================");
}
}

我的目标是处理MessageConversionException。消息由其他微服务发送,异常如下:


org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1693) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1583) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]
Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert serialized Message content
at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:114) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:302) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:323) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:122) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:205) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:132) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 10 common frames omitted
Caused by: java.lang.IllegalArgumentException: Could not deserialize object
at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:94) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
at org.springframework.amqp.support.converter.SimpleMessageConverter.fromMessage(SimpleMessageConverter.java:110) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 16 common frames omitted
Caused by: java.io.InvalidObjectException: enum constant FOOBOO does not exist in class com.hospital.appointment.enums.Disease
at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2014) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1570) ~[na:1.8.0_201]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[na:1.8.0_201]
at org.springframework.amqp.utils.SerializationUtils.deserialize(SerializationUtils.java:91) ~[spring-amqp-2.2.5.RELEASE.jar:2.2.5.RELEASE]
... 17 common frames omitted
Caused by: java.lang.IllegalArgumentException: No enum constant com.hospital.appointment.enums.Disease.FOOBOO
at java.lang.Enum.valueOf(Enum.java:238) ~[na:1.8.0_201]
at java.io.ObjectInputStream.readEnum(ObjectInputStream.java:2011) ~[na:1.8.0_201]
... 24 common frames omitted

所以我现在的问题是我的ErrorHandler根本不起作用。有人知道我在这里做错了什么吗?

编辑#1:

在更仔细地阅读了春季文档后,我发现我的代码中到底有什么不起作用:

https://docs.spring.io/spring-amqp/reference/html/#exception-处理

春天说得很清楚:

但是,有一类错误,侦听器无法控制行为。当遇到无法转换的消息(例如,无效的content_encoding标头(时,会在消息到达用户代码之前引发一些异常。…

具体来说,它拒绝出现以下错误的失败消息:

o.s.amqp…​MessageConversionException:使用MessageConverter转换传入消息负载时可能引发。

在一些行之后,它定义了一个解决方案:

您可以使用FatalExceptionStrategy配置此错误处理程序的实例,以便用户可以提供自己的条件消息拒绝规则—例如Spring Retry中BinaryExceptionClassifier的委托实现(Message Listeners和Asynchronous Case(。此外,ListenerExecutionFailedException现在有一个failedMessage属性,您可以在决策中使用它。

编辑#2:

在谷歌上搜索了一下后,按照@Borislav Stoilov的第二种方法,我开发了以下适用于我的解决方案。请记住,我将RabbitMQErrorHandler的注释更改为@Sercvice:

@Configuration
public class RabbitMQConsumerConfiguration implements RabbitListenerConfigurer {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ErrorHandler myRabbitMQErrorHandler,ConnectionFactory connectionFactory,SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(myRabbitMQErrorHandler);
return factory;
}    
}
@RequiredArgsConstructor
@Slf4j
@Service
public class RabbitMQErrorHandler implements ErrorHandler
{
@Override
public void handleError(Throwable t) {
System.out.println("======================================================================================");
System.out.println("error occurred in message listener and handled in error handler" + t.toString());
System.out.println("======================================================================================");
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", t);
}
}

您尝试过使用死信队列吗?它基本上是一个队列,用于存储所有未送达的消息或错误。

你需要把它定义为一个bean,类似于这个

@Bean
Queue deadLetterQueueBean() {
return QueueBuilder.durable("custom.dead.letter.queue").build();
}

然后定义类似于您已经使用的侦听器。

这个教程对我有用https://docs.spring.io/autorepo/docs/spring-cloud-stream-binder-rabbit-docs/1.1.1.RELEASE/reference/html/rabbit-dlq-processing.html

选项2:为SimpleMessageListenerContainer设置错误处理程序

class CustomErrorHandler implements ErrorHandler {
void handleError(Throwable genericError) {
// do something in case of error
}
}
@Autowired SimpleMessageListenerContainer 
simpleMessageListenerContainer;
...
simpleMessageListenerContainer.setErrorHandler(new CustomErrorHandler());

最新更新