如何在属于AbstractConsumerSeekAware类的Spring Kafka实现seekToEnd() ?&



我有一个需求。我们有一个spring启动kafka消费者应用程序,它从kafka主题读取。我们的要求是,每当应用程序下降和上升,我想开始最新的偏移量,而不是被旧的值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用AbstractConsumerSeekAware使用seekToEnd()将偏移量设置为末尾,如下面的代码所示。

public class KafkaConsumer extends AbstractConsumerSeekAware {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@KafkaListener(topics = "${topic.consumer}")
public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
//consuming the message from consumer Record.
seekToEnd();
doSomething();
ack.acknowledge();
}

然而,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量读取,但我们希望在应用程序启动时仅从偏移量读取。我们如何才能做到这一点?

@KafkaListener方法中这样做是不正确的。只有当消费者从分区交付记录时,才真正调用这个函数。

出于这个原因,您必须实现一个onPartitionsAssigned(),以便消费者在开始轮询这些分区之前查找这些分区。

查看更多文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

最新更新