Kafka 的 session.timeout.ms 和 max.poll.interval.ms 之差 >= 0.10.1



我不清楚为什么我们需要session.timeout.msmax.poll.interval.ms,什么时候我们使用一个或另一个或两者?这两种设置似乎都表明了协调器在假定消费者死亡之前等待从消费者那里获得心跳的时间上限。

基于KIP-62的0.10.1.0+版本的表现如何?

在KIP-62之前,只有session.timeout.ms(即Kafka 0.10.0及更早)。max.poll.interval.ms通过KIP-62 (Kafka 0.10.1的一部分)引入。

KIP-62,通过后台心跳线程将心跳从poll()调用中解耦,允许比心跳间隔更长的处理时间(即两个连续poll()之间的时间)。

假设处理一条消息需要1分钟。如果心跳和轮询是耦合的(即,在KIP-62之前),您需要将session.timeout.ms设置大于1分钟,以防止消费者超时。但是,如果一个消费者死亡,它也需要超过1分钟的时间来检测失败的消费者。

KIP-62分离轮询和心跳,允许在两个连续轮询之间发送心跳。现在您有两个线程在运行,心跳线程和处理线程,因此,KIP-62为每个线程引入了超时。session.timeout.ms为心跳线程,max.poll.interval.ms为处理线程。

假设您设置了session.timeout.ms=30000,因此,消费者心跳线程必须在此时间到期之前向代理发送一个心跳。另一方面,如果处理单个消息需要1分钟,则可以将max.poll.interval.ms设置为大于1分钟,以给处理线程更多的时间来处理消息。

如果处理线程死亡,则需要max.poll.interval.ms来检测。然而,如果整个消费者死亡(并且一个死亡的处理线程很可能使整个消费者崩溃,包括心跳线程),它只需要session.timeout.ms来检测它。

这个想法是,允许快速检测失败的消费者,即使处理本身需要很长时间。

<<p> Implemenation细节/strong>

新的超时max.poll.interval.ms主要是客户端概念:如果max.poll.interval.ms内没有调用poll(),心跳线程将检测到这种情况并向代理发送leave-group请求。max.poll.interval.ms仍然与消费者组重平衡相关:如果重平衡被触发,消费者有max.poll.interval.ms时间通过调用poll()客户端触发加入组请求来重新加入组。

最新更新