我有批处理@KafkaListener如下:
@KafkaListener(
topicPattern = "ProductTopic",
containerFactory = "kafkaBatchListenerFactory")
public void onBatch(List<Message<String>> messages, Acknowledgment acknowledgment) {
consume(messages); // goes to DB
acknowledgment.acknowledge();
}
我还创建了3个主题:ProductTopic.Retry-1
,ProductTopic.Retry-2
和ProductTopic.Retry-DLT
。想法是使用来自ProductTopic
的批量消息,并在DB批量插入失败时进行非阻塞指数重试。我想在每次重试失败时将消息发布到ProductTopic.Retry-#
,最后将其发送到ProductTopic.Retry-DLT
。还假设由于其他一些限制,我不能让框架为我创建重试和删除主题。
RetryTopicConfigurer
来配置这样的逻辑吗?如何手动定义重试和死信主题的名称?我应该为每个重试和dl主题创建@KafkaListener吗?还是使用RecoveringBatchErrorHandler
的最佳方法?
请分享任何关于这方面的例子和好的做法。我遇到了很多关于这些话题的评论和支持,但是有些评论现在已经很老了,因此与spring-kafka的旧版本有关。我可以看到有一些现代的方法可以处理批处理侦听器,但我也想请@Garry Russell和他的团队给我指出正确的方向。谢谢!
框架的非阻塞重试机制不支持批处理侦听器。
编辑
内置的基础设施与KafkaBackoffAwareMessageListenerAdapter
;您需要创建一个实现BatchAcknowledgingConsumerAwareMessageListener
的版本。
然后应该可以用它包装现有的侦听器,但您还需要一个自定义错误处理程序来将整个批处理发送到下一个重试主题。
这不是微不足道的。