Spring Cloud Stream Kafka重试次数是maxAttempts的10倍



我一直在尝试为Spring云流kafka实现重试逻辑,这样,如果在生成主题sample-topic的事件时引发异常,它会再重试两次
我在以下配置中添加到application.properties文件

spring.cloud.stream.bindings.processSampleEvent.destination=sample-topic
spring.cloud.stream.bindings.processSampleEvent.content-type=application/json
spring.cloud.stream.bindings.processSampleEvent.consumer.maxAttempts=2

我编写lister代码的方式是,它只记录收到的消息并抛出NullPointerException,这样我就可以测试重试。

@StreamListener(ListenerBind.SAMPLE_CHANNEL)
public void processSampleEvent(String productEventDto) {
System.out.println("Entering listener: " + productEventDto);
throw new NullPointerException();
}

但是,当通过向sample-topic生成事件进行测试时,我看到在日志中,该事件已经重试了20次,但我在属性中指定只重试两次,当我更改为3次时,还会发生一件奇怪的事情,它重试了30次
我是Spring云流的新手,任何关于这方面的帮助都会非常有帮助。提前感谢😊

侦听器容器中的默认错误处理程序现在是具有10次传递尝试的SeekToCurrentErrorHandler

您可以禁用绑定器中的重试,并使用所需的重试语义配置STCEH,也可以在绑定器中使用重试,并用简单的LoggingErrorHandler替换默认错误处理程序。

要配置容器的错误处理程序,请添加一个ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory>@Bean

我也遇到了同样的问题。我的工作解决方案是创建一个ListenerContainerCustomizerBean,给它所需的最大尝试次数,并设置消费者绑定maxAttempts:1

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?,?>> listenerContainerCustomizer(){
return (container, dest, group) ->
container.setErrorHandler(containerAwareErrorHandler());
}
public SeekToCurrentErrorHandler containerAwareErrorHandler(){
return new SeekToCurrentErrorHandler(new FixedBackOff(0, maxAttempts-1);
}

最新更新