处理Kafka Rebalance或应用程序重载上消耗的重复消息



目前,我已经设置了一个spring启动应用程序,它有3个pod运行。

我有一个Kafka消费者,它处理一个特定的任务需要20分钟。

当Kafka在这段时间内重新平衡时,相同的会再次被消耗,所以我已经设置了Redis键,每当消息首先到达时,所以当它重新平衡时,它会检查该键存在并丢弃该事件,因为旧进程仍在运行。

但是现在我有一个场景,一个特定的pod正在运行,可以随时重新启动,没有当应用程序重新启动和相同的消息被消耗,然后,我希望该消息被重新处理,但由于Redis密钥存在,它丢弃这个事件,但旧进程不运行。

我必须在应用程序重启时重新处理消息,并在Kafka重新平衡的情况下丢弃它。我该如何处理这种情况?

  1. 通过增加max.poll.interval.ms来避免再平衡

  2. Kafka并不适合这种长时间运行的任务;当从Kafka接收到任务时,考虑将任务移动到DB(甚至Redis),并从那里处理。

另一种解决方案是将max.poll.records设置为1,然后在另一个线程上运行任务之前暂停侦听器容器,然后在作业完成时恢复容器。

暂停容器将保持消费者存活并避免再平衡。

最新更新