卡夫卡消费者:抵消消费者和poducer之间的滞后



我正在编写一个Kafka使用者应用程序,其中每个分区有一个使用者。代码显示在下方

while (true) {  
ConsumerRecords<String, String> records = consumer.poll(100);   
for (ConsumerRecord<String, String> record : records) {     
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn",        
record.topic(), record.partition(), record.offset(), record.key(), record.value());   
}    
consumer.commitAsync(new OffsetCommitCallback() {       
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {  
if (e != null)              
log.error("Commit failed for offsets {}", offsets, e); }}); 
}

是否有一种方法可以通过编程访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移量和某个生产者写入该使用者分区的最后一个记录的偏移之间的位置差

知道我的最终目标是将这个值发送给prometheus进行监控,我应该在上面添加哪些语句来获得滞后偏移值

如果您的目标是在prometheus中拥有数据,那么您应该使用Consumers发出的records-lag度量。

在文档中Consumer Fetch Metrics部分的最后一个表中,您可以看到Consumer按主题分区发出滞后(和超前(。

默认情况下,度量是通过JMX发出的,因此您可以使用Prometheus的JMX导出器来完成这项工作。

您可以使用各种选项来监控您的消费者滞后。

使用KafkaConsumer的endOffsets API

根据KafkaConsumer上的JavaDocs,可以使用CCD_;获取给定分区的结束偏移量">

使用JMX度量

您可以使用JMX度量来利用Kafka的监控功能。在关于监控卡夫卡的文献中,解释了如何获得";使用者落后于生产者的消息数。由使用者而非代理发布">

最新更新