Spring cloud stream-Kafka消费者使用StreamListener消费重复消息



使用我们的春季启动应用程序,我们注意到kafka消费者只在prod-env中随机消费两次消息。我们有6个实例,在PCF中部署了6个分区。我们在同一主题中捕获了两次具有相同偏移量和分区的消息,这会导致重复,对我们来说是业务关键。我们在非生产环境中没有注意到这一点,在非生产的环境中很难复制。我们最近转向了卡夫卡,但我们无法找出根本问题。

我们正在使用弹簧云流/弹簧云流粘合剂kafka-2.1.2这是配置:

spring:
cloud:
stream:
default.consumer.concurrency: 1 
default-binder: kafka
bindings:
channel:
destination: topic
content_type: application/json
autoCreateTopics: false
group: group
consumer:
maxAttempts: 1
kafka:
binder:
autoCreateTopics: false
autoAddPartitions: false
brokers: brokers list
bindings:
channel:
consumer:
autoCommitOnError: true
autoCommitOffset: true
configuration:
max.poll.interval.ms: 1000000
max.poll.records: 1 
group.id: group

我们使用@Streamlisteners来消费消息。

以下是我们在服务器日志中收到的重复实例和错误消息。

错误46---[container-0-C-1]o.a.k.C.internals.ConsumerCoordinator:[Customer clientId=Consumer-3,groupId=group]偏移提交失败在分区主题0上偏移1291358:协调器不知道该成员。错误46---[容器0-C-1]o.s.kafka.listener.LoggingErrorHandler:处理时出错:null退出org.apache.kafka.clients.consumer.CommitFailedException:无法完成提交,因为组已重新平衡并且已将分区分配给另一个成员。这意味着对poll((的后续调用之间的间隔比配置的max.poll.interval.ms,这通常意味着轮询循环花费太多时间处理消息。你可以解决这个问题通过增加会话超时或减少最大值poll((中返回的批处理大小,最大值为.poll.records。网址:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordiinator.java:871(~[kafka-clients-2.0.1.jar!/:na]

没有崩溃,并且在复制时所有实例都是健康的。此外,错误日志也存在混淆-处理时出错:由于消息已成功处理两次,因此为null。max.poll.interval.ms:100000,大约16分钟,应该有足够的时间处理系统的任何消息,会话超时和检测信号配置是默认的。在大多数实例中,在2秒内收到重复。我们缺少什么配置吗?如有任何建议/帮助,我们将不胜感激。

由于组已重新平衡,无法完成提交

由于您的听众花费了太长时间,因此发生了重新平衡;您应该调整max.poll.recordsmax.poll.interval.ms,以确保始终能够在时限内处理收到的记录。

无论如何,卡夫卡并不保证只交付一次,而是至少交付一次。您需要在应用程序中添加幂等性,并检测/忽略重复项。

此外,请记住StreamListener和基于注释的编程模型已经弃用3年多了,并且已从当前主版本中删除,这意味着下一个版本将没有它。请将您的解决方案迁移到基于功能的编程模型

最新更新