我在玩Kafka,试图掌握它。我们需要做的一件事是运行负载平衡的服务器集-以实现冗余/高可用性等-然后相互独立地重新启动。应该很简单。
不过我的发现有点奇怪。如果我运行一个正在处理一组消息的Kafka使用者,然后在处理消息的同时将第二个使用者添加到同一使用者组,那么我会多次获得整个消息集,而不是一次。
例如,以下是我在这样一次运行中的日志文件:https://gist.github.com/sazzer/5604d0652ff14533654c8b543942c10e
这是使用2个主题-卡夫卡现场和卡夫卡批量。每个主题有2个分区,每个用户一个分区。然后,测试将向批量队列添加20条消息,然后向实时队列添加10条消息。(那实际上是在测试其他东西,但我只是重新使用了设置)
从日志中,您将看到每条消息总共被处理了3次,而不是像我预期的那样只处理一次。
其代码如下:https://gist.github.com/sazzer/c67e4db9a04aac8c0d46bbc21188775d
这是使用Spring Boot和Spring Kafka,除了这一个案例之外,它刚刚成功。
当新消费者出现时,我是不是错过了一些东西来阻止它重播所有信息?还是这只是我必须处理的事情?
干杯
尝试将ConsumerConfig.ENABLE_AUTO_COMMIT
设置为false
。
侦听器容器不依赖客户端进行提交,而是在处理完所有记录后,在每个BATCH记录之后提交偏移量;当重新平衡发生时,它还将提交任何未决的偏移;您也可以将AckMode
设置为RECORD
,它将为处理的每条记录提交偏移量。
您也可以手动将分区分配给实例,而不使用组管理进行分配。