默认情况下,max.poll.intervals.ms设置为int.max



apache kafka文档状态:

内部kafka流消费者max.poll.interval.ms默认值 已从300000更改为integer.max_value

由于此值用于检测批次记录的处理时间超过给定阈值时,是否有这样的"无限"值的原因?

它是否使应用程序变得无响应?或Kafka流的方法在处理时间太长时离开消费者组?

它是否使应用程序变得无响应?或Kafka Streams在处理时间太长时有不同的方式离开消费者组?

kafka流在这种情况下利用Kafka消费者客户端的心跳功能,因此从呼叫poll()中解除了心跳("此应用程序实例仍然活着?)。这两个主要参数是session.timeout.ms(用于心跳线)和max.poll.interval.ms(用于处理线程),其差异在https://stackoverflow.com/a/39759329/1743580中更详细地描述。

引入了令人心动的引入,以便可以允许应用程序实例花费大量时间处理记录,而不必考虑"不取得进步"。因此"死了"。例如,您的应用程序可以为一分钟的单一记录做很多事情,同时仍然令人心动地对kafka; quot"嘿,我还活着,而我 am 得了进步。但是我根本还没有完成处理。敬请期待。&quot"

当然,您可以将max.poll.interval.ms从其默认值(Integer.MAX_VALUE)更改为较低的设置,例如,您实际上确实希望将应用程序实例视为" Dead"如果在投票记录之间需要超过x秒的时间,那么处理最新一轮记录的时间超过x秒。是否取决于您的特定用例,无论是否有意义 - 在大多数情况下,默认设置是一个安全的选择。

session.timeout.ms:使用KAFKA的组管理设施时用于检测消费者故障的超时。消费者会发出定期的心跳,以表明其对经纪人的伤害。如果经纪人在本次会议超时到期之前未收到任何心跳,则经纪人将从小组中删除该消费者并启动重新平衡。请注意,该值必须在group.min.session.timeout.ms andgroup.max.session.timeout.ms.mm.ms。

中,在Broker配置中配置为允许范围。

max.poll.interval.ms:使用消费者组管理时,Poll()调用之间的最大延迟。这将消费者在获取更多记录之前可能会闲置的时间上有一个上限。如果在此超时到期之前未调用Poll(),则将消费者视为失败,并且该组将重新平衡以将分区重新分配给另一个成员。

最新更新