更新Open Telemetry Python中的ObservableGauge



我使用的是opentelemetry-api 1.14和opentelemetry-sdk 1.14。我知道如何创建和使用Counter和ObservableGauge仪器。但是,我需要在整个应用程序中以类似于计数器如何使用其add方法的方式更新和设置仪表。我在下面有工作代码,但在这个工作代码中,压力表是静态的,为9。

import time
""" API is the interface that you should interact with."""
from opentelemetry import metrics
"""
SDK is the implementation. Only access SDK during initialization, startup, and shutdown.
"""
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
def initialize():
resource = Resource(attributes={"service.name": "otel-test"})
readers = []
# Console Exporter
exporter = ConsoleMetricExporter() 
reader1 = PeriodicExportingMetricReader(exporter, export_interval_millis=5000) 
readers.append(reader1)
provider = MeterProvider(metric_readers=readers, resource=resource) 
metrics.set_meter_provider(provider) 
initialize()
provider = metrics.get_meter_provider()
meter =  provider.get_meter("my-demo-meter")
simple_counter = meter.create_counter("simple_counter", description="simply increments each loop")
# Async Gauge
def observable_gauge_func(options):
yield metrics.Observation(9, {})
simple_gauge = meter.create_observable_gauge("simple_gauge", [observable_gauge_func])
# How can I update simple_gauge in main
def main():
loop_counter = 0
while True:
print(loop_counter)
loop_counter += 1
simple_counter.add(1)
# How can I update simple_gauge here?
time.sleep(5)
main()

我不确定这是否是实现ObservableGauge仪器的最佳模式,但这是我用来为我的应用程序实现它的方法,因为我的问题中描述了需求(即在主功能中更新测量)。鉴于ObservableGauge仪器的例子很少,它值得提供。

import time
import random
""" API is the interface that you should interact with."""
from opentelemetry import metrics
"""
SDK is the implementation. Only access SDK during initialization, startup, and shutdown.
"""
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
def initialize():
resource = Resource(attributes={"service.name": "otel-test"})
readers = []
# Console Exporter
exporter = ConsoleMetricExporter() 
reader1 = PeriodicExportingMetricReader(exporter, export_interval_millis=5000) 
readers.append(reader1)
provider = MeterProvider(metric_readers=readers, resource=resource) 
metrics.set_meter_provider(provider) 
initialize()
provider = metrics.get_meter_provider()
meter =  provider.get_meter("my-demo-meter")
simple_counter = meter.create_counter("simple_counter", description="simply increments each loop")
def create_simple_gauge(signal):
# Async Gauge
def observable_gauge_func(options):
yield metrics.Observation(signal.get_current_value(), {"simple_attribute": signal.attribute})
simple_gauge = meter.create_observable_gauge("simple_gauge", [observable_gauge_func])
return simple_gauge

class Signal:
def __init__(self, attribute):
self.attribute = attribute
def set_current_value(self, i):
self.current_value = i

def get_current_value(self):
return self.current_value
# How can I update the simple_gauge here?
def main():
loop_counter = 0
simple_signal = Signal("simple_attribute")
create_simple_gauge(simple_signal)
while True:
print(loop_counter)
loop_counter += 1
simple_counter.add(1)
randint = random.randint(0, 5)
print(randint)
simple_signal.set_current_value(randint)
time.sleep(5)
main()

最新更新