Spring cloud Kafka在失败时进行无限重试



当前,我遇到一个问题,其中一个使用者函数抛出错误,导致Kafka一次又一次地重试记录。

@Bean
public Consumer<List<RuleEngineSubject>> processCohort() {
return personDtoList -> {

for(RuleEngineSubject subject : personDtoList)
processSubject(subject);
};
}

这是消费者processSubject抛出一个自定义错误,导致其失败。

processCohort-in-0:
destination: internal-process-cohort
consumer:
max-attempts: 1
batch-mode: true
concurrency: 10
group: process-cohort-group

以上是我给卡夫卡的活页夹。

目前,我尝试重试2次,然后发送到死信队列,但一直没有成功,不确定采取哪种方法是正确的。

我已经尝试实现了一个自定义处理程序,它将在失败时处理错误,但不会重试,我不知道如何发送到死信队列

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
if (group.equals("process-cohort-group")) {
container.setBatchErrorHandler(new BatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
System.out.println(data.records(dest).iterator().);
data.records(dest).forEach(r -> {
System.out.println(r.value());
});
System.out.println("failed payload='{}'" + thrownException.getLocalizedMessage());
}
});
}

};
}

这将停止无限重试,但不会发送死信队列。我能得到关于如何重试两次然后发送死信队列的建议吗。据我所知,当出现错误时,批处理监听器不知道如何恢复,有人能帮助照亮这个吗

重试15次,然后将其抛出到topicname。DLT主题

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()), kafkaBackOffPolicy()));
factory.setConsumerFactory(kafkaConsumerFactory());
return factory;
}

@Bean
public ExponentialBackOffWithMaxRetries kafkaBackOffPolicy() {
var exponentialBackOff = new ExponentialBackOffWithMaxRetries(15);
exponentialBackOff.setInitialInterval(Duration.ofMillis(500).toMillis());
exponentialBackOff.setMultiplier(2);
exponentialBackOff.setMaxInterval(Duration.ofSeconds(2).toMillis());
return exponentialBackOff;
}

您需要在侦听器容器中配置一个合适的错误处理程序;您可以在绑定中禁用retry和dlq,而是使用DeadLetterPublishingRecoverer。在Spring Cloud Stream Kafka Binder 中消费批次时,请参阅答案重试最多3次

相关内容

  • 没有找到相关文章

最新更新