在春季应用程序prometheus中启用kafka客户端指标



我有一个应用程序,它使用从kafka主题并生成到kafka的主题。我正在尝试设置Grafana仪表板来显示卡夫卡指标。我正试图向普罗米修斯公开卡夫卡指标,这样Grafana仪表板就可以获取这些指标并进一步显示它们。

以下是我正在使用的库和属性。

org.apache.kafka:kafka-clients:jar:2.5.1:compile
io.micrometer:micrometer-registry-prometheus:jar:1.5.4:compile
io.micrometer:micrometer-jersey2:jar:1.5.4:compile

应用程序属性:

spring.jmx.enabled=true
management.endpoints.web.base-path=/
management.endpoint.metrics.enabled=true
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.metrics.export.prometheus.enabled=true

使用千分尺,prometheus显示jvm、tomcat相关度量以及自定义度量。但卡夫卡指标并没有被曝光。我试着调试,却毫无头绪地离开了。任何建议都会很有帮助。

作为提醒,我没有使用spring-kafka注释。我将它作为独立的多线程运行,在那里我使用consumer.poll(1000L)方法获取记录。

卡夫卡消费者被创建为:new KafkaConsumer<>(getProps()),卡夫卡生产者被创建为new KafkaProducer<>(props)

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
});
return consumer;
}

我已经在Gitter上回答了你——上面的代码不是使用Spring for Apache Kafka,你正在创建自己的消费者和生产者。

如果您将Spring用于Apache Kafka的DefaultKafkaConsumerFactory,则可以向其添加MicrometerConsumerListener,Spring将通过KafkaClientMetrics向仪表注册表注册每个消费者的指标。如果你创造自己的消费者,你必须自己去做。

制片人也是如此。

最新更新