我正在收集kafka流应用程序的一系列指标,我的问题是我想要一个特定名称的米的统一值。为了更清楚一点,这些指标被表示为一个n项的数组,其中n是为Kafka Streams应用程序配置的线程数。我必须补充一点,我可以通过添加‘FunctionCounter’的总和来合并这些值。然而,当我感兴趣的仪表更新时,我没有触发"aggregator方法"的机制。aggregator方法复制如下:
private Double aggregateValues(String idName){
return meterRegistry.getMeters().stream()
.filter(meter -> meter.getId().getName().startsWith(idName))
.filter(FunctionCounter.class::isInstance)
.map(FunctionCounter.class::cast)
.mapToDouble(FunctionCounter::count)
.sum();
}
我创建了一个配置bean来做同样的尝试,不高兴
@Configuration
@Component
@Slf4j
public class Metrics {
@Bean
public FunctionCounter getAggregateCounter(MeterRegistry registry) {
List<FunctionCounter> counters = registry.getMeters().stream().
filter(meter -> meter.getId().getName().
startsWith("Output_Message_Count"))
.filter( FunctionCounter.class::isInstance )
.map(FunctionCounter.class::cast)
.collect(Collectors.toList());
FunctionCounter counter = FunctionCounter
.builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
.description("a description of what this counter does")
.tags("region", "test")
.register(registry);
return counter;
}
}
下面列出的驱动器/prometheus endpoint os的原始数据输出示例
# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0
最终目标是拥有一个在总数更新时合并它们的仪表,在上面显示的原始数据的情况下,为20在接下来的提案中已经有关于这项工作的讨论,但除了最初的提案阶段,它还没有进展该提案可在以下网址查看。以下网址:https://cwiki.apache.org/confluence/display/kafka/kip - 674% - 3 +标准+记者总+标准+ + + +卡夫卡+流
这种聚合应该发生在报告端。例如,在Prometheus(看起来您正在使用它)中,您将编写一个查询来自动聚合它们:
kafka_stream_thread_task_created_total
或者,如果你的最终目标是丢弃像thread_id
这样的标签,那么你可以使用MeterFilter
来删除该标签,这样所有这些计数器都被视为相同的计数器。
我建议编写一个普罗米修斯级别的查询,而不是编写java代码。