如何在 spring kafka 中使用 SeekToCurrentErrorHandler 时设置重试间隔时间



我正在使用 spring-boot 应用程序中的 SeekToCurrentErrorHandler 处理容器侦听器错误。我想设置重试间隔,如果发生异常,它应该等待一段时间并重试,直到最大尝试次数。

我尝试通过设置备份策略来添加重试模板。 但它没有像例外的那样工作。 当错误发生最大尝试时间时,它将调用 SeekToCurrentErrorHandler 2 次。

@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}

@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}

@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
// ------->>     factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
// handle errors 
}, retryMaxAttempts);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}

如何使用 SeekToCurrentErrorHandler 时设置重试间隔时间?

这是下一个版本(2.3(中的新功能。

但是,您可以使用重试模板执行此操作,但尝试总数将是两个属性的倍数。

最新更新