Spring-Kafka 2.6.5无限重试策略通过有状态重试和SeekToCurrentErrorHandler.&l



正如标题所示,我使用的是spring-kafka 2.6.5版本。在我的体系结构中,我有一个主题,它有一个SimpleRetryPolicy与一个ExponentialBackoffPolicy配对。如果重试尝试用尽,我有一个RecoveryCallback,它将消息发送到错误主题。错误主题是我的问题所在。

在这个错误主题中,我需要能够执行无限重试并且不让任何消息被丢弃。通过"丢弃",我的意思是如果Spring崩溃或其他同样糟糕的事情发生,我需要确保,当恢复时,任何处于处理中间的消息都可以重新轮询(顺序无关紧要)。基本上,我认为我需要配置ack,以便在处理完成后确认它们。至于无限重试,我搜索了一下,发现了许多有用的建议,来自像Gary Russell这样的用户。不幸的是,不同的spring-kafka版本和弃用使得拼凑出一个明确的解决方案来满足我的需求和版本有点困难。

目前,我的设置看起来像这样:

@KafkaListener(topics = "my_topic", 
groupId = "my_group_id", 
containerFactory = "kafkaErrorListenerContainerFactory")
public void listenErrorTopic(String message) throws Exception {
processingLogic(message);
// Do I need to manually ACK afterwards (and thus also include additional params to access needed 
// message components)?
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap();
...
// Basing the need for the below 2 props off of previously found posts
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Unsure if the below prop is needed
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
...
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaErrorListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// A previous post said that infinite retries could only be achieved via state retry and STCEH,
// but there is an alternative in 2.6?
factory.setStatefulRetry(true);
// A previous post had '-1' passed to SeekToCurrentErrorHandler, but that is no longer possible.
// It was suggested instead to pass Long.MAX_VALUE to the backoff period for later versions, but the 
// policy shown was a FixedBackOffPolicy.
factory.setErrorHandler(new SeekToCurrentErrorHandler());
RetryTemplate retryTemplate = new retryTemplate();
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
// Do I need a recovery callback set in my RetryTemplate if I want it to be infinite?
ExponentialBackOffPolicy backoffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(<props file value insertion here>)
backOffPolicy.setMultiplier(<props file value insertion here>)
backOffPolicy.setMaxInterval(<props file value insertion here>)
retryTemplate.setBackOffPolicy(backoffPolicy);
factory.setRetryTemplate(retryTemplate);
return factory;
}

理想情况下,我更喜欢指数而不是固定,但我主要关心的是在没有max.interval.ms触发再平衡的情况下无限完成它的能力。我在不确定的代码块中留下了注释。如果有人能澄清一些事情,我将不胜感激!

在STCEH支持回退之前,使用有状态重试是专门为STCEH设计的,以避免再平衡。

然而,现在在STCEH中支持back off,最好使用它而不是重试模板。

如果两者都使用,则实际的重试次数是STCEH和重试模板重试次数的倍数。

既然SeekToCurrentErrorHandler可以配置为BackOff,并且具有仅重试某些异常的能力(从2.3版本开始),那么通过侦听器适配器重试配置使用有状态重试就不再需要了。您可以通过错误处理程序的适当配置来提供相同的功能,并从侦听器适配器中删除所有重试配置。有关详细信息,请参阅查找当前容器错误处理程序。

配置更简单。

你不需要使用手动ack;容器将根据AckModeBATCH(默认)或RECORD提交偏移量。后者更昂贵,但提供更少的重新交付的机会。

对于无限重试,在maxAttempts属性中使用带有UNLIMITED_ATTEMPTS (Long.MAX_VALUE)的FixedBackOff

默认情况下,ExponentialBackOff将无限次重试。您只需要确保maxInterval小于max.poll.interval.ms,以避免再平衡。

最新更新