什么时候/为什么卡夫卡分区可以取消暂停"by itself"?



我有这个kafka消费者:

new ReactiveKafkaConsumerTemplate<>(createReceivingOptions())

它愉快地处理消息,我设置

max-poll-records=1

这样事情就不会发生在我身上。我可以通过在

上的poll方法中记录断点来验证
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);

poll返回了多少条记录,是的,它是1。然后我要求它暂停所有分配的分区。在日志中可以看到它工作了!

o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=Testing, groupId=Testing] Pausing partitions [TestTopic-0]

,从这一点上我可以看到,投票只有0条记录,还有这个日志:

Skipping fetching records for assigned partition TestTopic-0 because it is paused

好的,它工作了!等等,为什么我的整个话题都被处理了?

然后我发现,在某一点上还有这个日志:

Consumer clientId=Testing, groupId=Testing] Resuming partitions [TestTopic-0]

什么?谁打来的?然后我还发现,到处都有多个暂停请求,而不仅仅是我实际调用的那个。

暂停以某种方式由响应使用,不能手动使用?或者有人解释为什么…clients.consumer.KafkaConsumer一直在自己暂停/恢复主题,而手动暂停因为没有暂停?

在审查ConsumerEventLoop代码后,响应客户端内部使用pause/resume来处理反压力-当下游无法接收更多数据时,他暂停所有分配的分区,并在反压力解除时无条件恢复它们。

在我看来,它需要跟踪暂停是否因为反压而完成,并且只有在这种情况下才恢复。

看起来在这次提交之前是这样做的。

也许你可以用反压来强制暂停?

最新更新