我有这个kafka消费者:
new ReactiveKafkaConsumerTemplate<>(createReceivingOptions())
它愉快地处理消息,我设置
max-poll-records=1
这样事情就不会发生在我身上。我可以通过在
上的poll
方法中记录断点来验证final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
poll返回了多少条记录,是的,它是1。然后我要求它暂停所有分配的分区。在日志中可以看到它工作了!
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=Testing, groupId=Testing] Pausing partitions [TestTopic-0]
,从这一点上我可以看到,投票只有0条记录,还有这个日志:
Skipping fetching records for assigned partition TestTopic-0 because it is paused
好的,它工作了!等等,为什么我的整个话题都被处理了?
然后我发现,在某一点上还有这个日志:
Consumer clientId=Testing, groupId=Testing] Resuming partitions [TestTopic-0]
什么?谁打来的?然后我还发现,到处都有多个暂停请求,而不仅仅是我实际调用的那个。
暂停以某种方式由响应使用,不能手动使用?或者有人解释为什么…clients.consumer.KafkaConsumer
一直在自己暂停/恢复主题,而手动暂停因为没有暂停?
在审查ConsumerEventLoop
代码后,响应客户端内部使用pause/resume
来处理反压力-当下游无法接收更多数据时,他暂停所有分配的分区,并在反压力解除时无条件恢复它们。
在我看来,它需要跟踪暂停是否因为反压而完成,并且只有在这种情况下才恢复。
看起来在这次提交之前是这样做的。
也许你可以用反压来强制暂停?