用@KafkaListener批注的方法不会传播遇到的异常.因此,无法调用我的重试配置



我有一个用@kafkaListener注释的Kafka监听器方法。它接受消息类型的参数和确认。我处理收到的消息并使用acknowledgement.acknowledge((进行手动提交。我已经在容器上设置了重试模板。重试策略是特定于异常的。为此,我创建了自己的 RetryPloicy 类,并使用 ExceptionClassifierRetryPolicy 进行了扩展。在该类中,根据收到的异常,我返回 AlwaysRetryPolicy、NeverRetryPolicy 和 SimpleRetryPolicy。我遇到的问题是,当在侦听器方法中处理消息期间发生 DataAccessException 时,我想永远重试,并且我已经相应地配置了重试策略,但侦听器方法总是抛出 ListenerExecutionFailedException,而不是遇到的异常,该异常在堆栈下方抛出,直到上面的消息处理方法中的侦听器方法。由于此异常是由侦听器引发的,因此我的重试配置无法按预期工作。

示例代码如下:

@KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroType> message, Acknowledgement ack){
SomeAvroType type = message.getPayLoad();
type.processIncomingMessage();
ack.acknowledge();
}

重试策略配置

@component 
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(3);
this.setExceptionClassifier( new Classifier<Throwable, RetryPolicy>()
{
@Override
public RetryPolicy classify( Throwable classifiable ){
// Always Retry when instanceOf TransientDataAccessException
if ( classifiable instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy;
}
else if(classifiable instanceOf SomeOtherException){
return simpleRetryPolicy; 
}
// Do not retry for other exceptions
return new NeverRetryPolicy();
}
} );
}
}

我使用容器上提供的大部分自动配置,因此我在重试配置类中自动连接ConcurrentKafkaListenerContainerFactory。

@configuration
public class RetryConfig{

@Bean
public RetryTemplate retryTemplate(@Autowired @Qualifier("kafkaListenerContainerFactory")ConcurrentKafkaListenerContainerFactory factory;){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(1000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset)
}
}

当我在调试模式下运行它时,我在processIncomingMessage((中抛出一个TransientDataAccessException,我希望总是重试,但侦听器方法不会抛出传播的异常,但它抛出ListenerExecutionFailedException,它的原因(e.getCause(((是TransientDataAccessException。因此,重试策略的计算结果始终为 NeverretryPloicy。有没有办法在侦听器中抛出传播的异常,以便我的重试配置正确执行?

请参阅 BinaryExceptionClassifier 及其traverseCauses属性。

/**
* Create a binary exception classifier.
* @param defaultValue the default value to use
* @param typeMap the map of types to classify
* @param traverseCauses if true, throwable's causes will be inspected to find
* non-default class
*/
public BinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue,
boolean traverseCauses) {
super(typeMap, defaultValue);
this.traverseCauses = traverseCauses;
}

相关内容

最新更新