我想要实现的是确保我的Kafka流消费者没有滞后。
我有一个简单的Kafka流应用程序,它以GlobalKTable的形式将一个主题具体化为存储。
当我试图用命令描述Kafka上的消费者时:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-application-id
我看不到任何结果。而且也没有错误。当我按以下方式列出所有消费者时:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups
我的应用程序使用者已正确列出。
知道在哪里可以找到我无法描述消费者的其他信息吗?(任何其他写主题的Kafka流消费者都可以正确描述。(
如果您的应用程序仅将主题具体化为GlobalKTable
,则不会形成消费者组。在内部,"全局使用者"不使用subscribe()
,而是使用assign()
,并且没有配置使用者group.id
(您可以从日志中验证(,也没有提交偏移量。
原因是,所有应用程序实例都需要使用所有主题分区(即广播模式(。然而,消费者组的设计使得不同的实例为同一主题读取不同的分区。此外,对于每个使用者组,每个分区只能提交一个偏移量——但是,如果多个实例读取同一个分区并使用相同的group.id
提交偏移量,则提交将相互覆盖。
因此,在"广播"数据的同时使用消费者组是行不通的。
但是,所有消费者都应暴露"滞后"指标records-lag-max
和records-lag
(cfhttps://kafka.apache.org/documentation/#consumer_fetch_monitoring)。因此,您应该能够通过JMX来监控滞后。Kafka Streams也通过KafkaStreams#metrics()
包含客户端度量。