Apache Flink:以key为标签的度量



我在Flink上使用PrometheusReporter,并且有一些计数器我想用键流的键导出。

我没有open上的键,所以我试着用processElement来代替,就像:

// job
myStream.keyBy(Foo::getKey)
.process(new FooOperator())
.name("foo");
// operator
public class FooOperator extends KeyedProcessFunction<String, Foo, Bar> {
private transient Counter counter;
@Override
public void processElement(Foo value, Context ctx, Collector<Bar> out) throws Exception {
if (counter == null) {
counter = getRuntimeContext().getMetricGroup().addGroup("key", value.getKey()).counter("my_counter"):
}
counter.inc()
out.collect(new Bar(foo));
}
}

但是我仍然得到:

Name collision: Group already contains a Metric with the name 'my_counter'. Metric will not be reported.

有什么我能做的吗?也许有更好的办法,我没找到?

我的想法是得到任何计数器已经存在,但我找不到一种方法来做到这一点,不是反射…

给定的KeyedProcessFunction实例跨许多不同的键进行多路复用。当上下文中没有特定的键时,open方法只被调用一次,因此您不能在那里创建每个键的指标。

我认为到目前为止你所尝试的有两个问题。首先,您需要为每个不同的键创建一个单独的Counter——而不是每个KeyedProcessFunction实例创建一个。其次,我不确定addGroup("key", value.getKey())是否会在这里工作。如果你在解决第一个问题后仍然有问题,那么也许可以尝试addGroup("key").addGroup(value.getKey())代替。

最后,如果你的键空间很大,特别是如果它是无界的,那么整个方法是有问题的。Flink的度量系统并不是为了扩展到大量的度量而设计的。

最新更新