如何用DefaultErrorHandler(spring-kafka)替换弃用的SeekToCurrentErrorH



我正试图找到一种方法来使用新的DefaultErrorHandler,而不是春季kafka 2.8.1中不推荐使用的SeekToCurrentErrorHandler,以便在出现错误时覆盖重试默认行为。我想";停止";重试过程,因此如果发生错误,则不应进行重试。

现在,在一个配置类中,我有了如下bean,它按预期工作:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new **SeekToCurrentErrorHandler(new FixedBackOff(0L, 1L)**));
factory.setConsumerFactory(requestConsumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}

由于在这个春季kafka版本中,STCEH被弃用,我尝试在同一个配置类中执行以下操作:

@Bean
public DefaultErrorHandler eh() {
return new DefaultErrorHandler(new FixedBackOff(0, 1));
}

但它似乎不起作用。在出现错误的情况下,重试次数是默认的,正如我在日志中看到的:

[org.springframework.kafka.KafkaListenerEndpointContainer#00-C-1]错误默认错误处理程序-Backoff FixedBackOff{interval=0,currentAttempts=10,maxAttempts=9}已用完topicX

应该如何使用此DefaultErrorHandler来实现所需的行为?或者我应该用别的东西吗?

提前Thx!

factory.setCommonErrorHandler(new Default....)

CommonErrorHandlerbean的引导自动配置需要Boot 2.6。

https://github.com/spring-projects/spring-boot/commit/c3583a4b06cff3f53b3322cd79f2b64d17211d0e

  1. factory.setErrorHandler(新SeekToCurrentErrorHandler(新增FixedBackOff(0L,1L(((;实际上,它最多会重试一次传递(2次传递尝试(。(https://docs.spring.io/spring-kafka/docs/2.7.8/reference/html/#seek-至当前(

  2. 默认重试次数为**9((FixedBackOff(0L,9L((,而不是1(https://docs.spring.io/spring-kafka/docs/2.8.1/reference/html/#default-eh(

  3. 你应该试试setCommonErrorHandler而不是像factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L));那样的setErrorHandler

相关内容

  • 没有找到相关文章

最新更新