如何聚合来自一组千分尺计数器的值



我正在收集kafka流应用程序的一系列指标,我的问题是我想要一个特定名称的米的统一值。为了更清楚一点,这些指标被表示为一个n项的数组,其中n是为Kafka Streams应用程序配置的线程数。我必须补充一点,我可以通过添加‘FunctionCounter’的总和来合并这些值。然而,当我感兴趣的仪表更新时,我没有触发"aggregator方法"的机制。aggregator方法复制如下:

private Double aggregateValues(String idName){
return meterRegistry.getMeters().stream()
.filter(meter -> meter.getId().getName().startsWith(idName))
.filter(FunctionCounter.class::isInstance)
.map(FunctionCounter.class::cast)
.mapToDouble(FunctionCounter::count)
.sum();
}

我创建了一个配置bean来做同样的尝试,不高兴

@Configuration
@Component
@Slf4j
public class Metrics {
@Bean
public FunctionCounter getAggregateCounter(MeterRegistry registry) {
List<FunctionCounter> counters = registry.getMeters().stream().
filter(meter -> meter.getId().getName().
startsWith("Output_Message_Count"))
.filter( FunctionCounter.class::isInstance )
.map(FunctionCounter.class::cast)
.collect(Collectors.toList());
FunctionCounter counter = FunctionCounter
.builder("Combined_Output_Message_Count", counters, state -> state.stream().mapToDouble(FunctionCounter::count).sum())
.description("a description of what this counter does")
.tags("region", "test")
.register(registry);
return counter;
}
}

下面列出的驱动器/prometheus endpoint os的原始数据输出示例

# HELP kafka_stream_thread_task_created_total The total number of newly created tasks
# TYPE kafka_stream_thread_task_created_total counter
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-4",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-3",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-2",} 5.0
kafka_stream_thread_task_created_total{kafka_version="2.7.1",spring_id="stream-builder-process",thread_id="sainsburys.applications.sc-dis.price-specification-acl-e0e5af91-ce55-4e0c-998d-269b9c6bade0-StreamThread-1",} 5.0

最终目标是拥有一个在总数更新时合并它们的仪表,在上面显示的原始数据的情况下,为20在接下来的提案中已经有关于这项工作的讨论,但除了最初的提案阶段,它还没有进展该提案可在以下网址查看。以下网址:https://cwiki.apache.org/confluence/display/kafka/kip - 674% - 3 +标准+记者总+标准+ + + +卡夫卡+流

这种聚合应该发生在报告端。例如,在Prometheus(看起来您正在使用它)中,您将编写一个查询来自动聚合它们:

kafka_stream_thread_task_created_total

或者,如果你的最终目标是丢弃像thread_id这样的标签,那么你可以使用MeterFilter来删除该标签,这样所有这些计数器都被视为相同的计数器。

我建议编写一个普罗米修斯级别的查询,而不是编写java代码。

相关内容

  • 没有找到相关文章

最新更新