即使在特定配置后,弹簧云kafka流也不会在避免误差后重试。期望是,它应该基于配置的重试策略重试,最后将失败的消息推向DLQ。
以下配置。
spring.cloud.stream.bindings.input_topic.consumer.maxAttempts=7
spring.cloud.stream.bindings.input_topic.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input_topic.consumer.backOffMultiplier=10.0
spring.cloud.stream.bindings.input_topic.consumer.backOffMaxInterval=100000
spring.cloud.stream.bindings.iinput_topic.consumer.defaultRetryable=true
public interface MyStreams {
String INPUT_TOPIC = "input_topic";
String INPUT_TOPIC2 = "input_topic2";
String ERROR = "apperror";
String OUTPUT = "output";
@Input(INPUT_TOPIC)
KStream<String, InObject> inboundTopic();
@Input(INPUT_TOPIC2)
KStream<Object, InObject> inboundTOPIC2();
@Output(OUTPUT)
KStream<Object, outObject> outbound();
@Output(ERROR)
MessageChannel outboundError();
}
@StreamListener(MyStreams.INPUT_TOPIC)
@SendTo(MyStreams.OUTPUT)
public KStream<Key, outObject> processSwft(KStream<Key, InObject> myStream) {
return myStream.mapValues(this::transform);
}
kafkatopicproviseer.java中的元数据术始终是无效的,因此它在afterPropertiesSet()
中创建了一个新的retrytemplate。
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
this.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
}
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
public void afterPropertiesSet() throws Exception {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100L);
backOffPolicy.setMultiplier(2.0D);
backOffPolicy.setMaxInterval(1000L);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
}
}
重试配置仅适用于基于MessageChannel
的粘合剂。借助KStream粘合剂,Spring只是有助于以规定的方式构建拓扑,一旦构建拓扑,它就不会与消息流有关。
spring-kafka
的下一个版本(粘合剂使用(添加了RecoveringDeserializationExceptionHandler
(在此处提交(;虽然它不禁重试,但可以与DeadLetterPublishingRecoverer
一起使用,将记录发送到一个书信主题。
您可以在处理器/变压器中使用RetryTemplate
重试特定操作。
春季云Kafka流,即使在特定配置之后,避免误差也不会重试。
您看到的行为匹配Kafka流的默认设置,当时它遇到了挑选错误。
来自https://docs.confluent.io/current/current/streams/faq.html#handling-corrupted records-and-deserialization-erialization-Errors-erors-erors-poison-poison-poison-poisor-polecords:
LogAndFailExceptionHandler
实现DeserializationExceptionHandler
,并且是Kafka流中的默认设置。它通过记录错误并丢下致命错误以停止您的流式应用程序来处理任何遇到的避免异常。如果您的应用程序配置为使用LogAndFailExceptionHandler
,则应用程序的实例在遇到损坏的记录时会失败。
我不熟悉Kafka流的Spring立面,但是您可能需要配置所需的org.apache.kafka.streams.errors.DeserializationExceptionHandler
,而不是配置重试(它们是出于不同的目的(。或者,您可能需要实现自己的自定义处理程序(有关更多信息,请参见上面的链接(,然后配置Spring/Kstreams使用它。