如何检测处于僵尸状态的Kafka Streams应用程序



我们的一个Kafka Streams应用程序的StreamThread使用者在生成以下日志消息后进入僵尸状态:

[使用者客户端id=通知-处理器-db9aa8a3-6c3b-453b-b8c8-106bf2fa257d-StreamThread-1-使用者,组id=通知处理器]成员通知-处理器-db9aa8a3-6 c3b-453b-8c8-106bf2fa257d-StreamThread-1-消费者b2b9eac3-c374-43e2-bbc3-d9ee514a3c16由于使用者轮询超时而向协调器发送LeadGroup请求***:9092(id:2147483646 rack:null(过期。这意味着后续调用poll((的时间比配置的max.poll.interval.ms长,这通常意味着轮询循环在处理消息方面花费了太多时间。您可以通过增加max.poll.interval.ms或使用max.poll.record.减少poll((中返回的批的最大大小来解决此问题

StreamThread的Kafka Consumer似乎已经离开了消费者组,但Kafka Streams应用程序仍处于RUNNING状态,同时没有消费任何新记录。

我想检测一下Kafka Streams应用程序是否已进入僵尸状态,以便关闭它并用新实例替换它。通常,我们通过Kubernetes健康检查来验证Kafka Streams应用程序是否处于RUNNING或REPARTITIONING状态,但这在这种情况下不起作用。

因此,我有两个问题:

  1. 当Kafka Streams应用程序没有活跃的消费者时,它是否会保持运行状态?如果是:为什么
  2. 我们如何(通过程序/指标(检测到Kafka Streams应用程序已经进入了没有活跃消费者的僵尸状态

当Kafka Streams应用程序没有活动的消费者时,它是否会保持运行状态?如果是:为什么?

这取决于版本。在旧版本(2.1.x及更早版本(中,Kafka Streams确实会保持在RUNNING状态,即使所有线程都死了。此问题在v2.2.0中通过https://issues.apache.org/jira/browse/KAFKA-7657.

我们如何(通过程序/指标(检测到Kafka Streams应用程序已进入这样一个没有活跃消费者的僵尸状态?

即使在旧版本中,也可以在KafkaStreams客户端上注册未捕获的异常处理程序。每次StreamThreads死亡时都会调用此处理程序。

顺便说一句:在即将发布的2.6.0版本中,添加了一个新的度量alive-stream-threads来跟踪运行线程的数量:https://issues.apache.org/jira/browse/KAFKA-9753

仅供参考,在用户邮件列表上也有类似的讨论——主题行"kafka流僵尸状态";

首先,我要告诉你们我在那里说了什么,因为到目前为止,对话中似乎存在一些误解:基本上,错误消息有点误导,因为它暗示这是由消费者自己记录的,并且它是当前发送此LeaveGroup/已经注意到它错过了投票间隔。但是当注意到主使用者线程没有在最大轮询超时内轮询时,这个消息实际上被心跳线程记录了下来,并且从技术上讲只是将其标记为"0";需要重新加入";以便消费者知道在其最终再次进行轮询时发送该LeaveGroup。然而,如果使用者线程实际上被卡在用户/应用程序代码中的某个位置,并且无法中断以继续轮询循环,那么使用者将永远不会真正触发重新平衡、尝试重新加入、发送LeaveGroup请求等。因此,这就是为什么状态继续为RUNNING而不是REBALANCING的原因。

出于上述原因,像num-alive-stream-threads这样的指标也无济于事,因为线程并没有消亡——它只是被卡住了。事实上,即使线解开了,它也会重新连接,然后像往常一样继续,它不会"断裂";模具;(因为只有在遇到致命异常时才会发生这种情况(。

长话短说:broker和heartbeat线程已经注意到消费者不再在组中,但StreamThread很可能被卡在拓扑中的某个位置,因此消费者本身实际上不知道它已经被踢出了消费者组

相关内容

  • 没有找到相关文章

最新更新