我在Flink上使用PrometheusReporter
,并且有一些计数器我想用键流的键导出。
我没有open
上的键,所以我试着用processElement
来代替,就像:
// job
myStream.keyBy(Foo::getKey)
.process(new FooOperator())
.name("foo");
// operator
public class FooOperator extends KeyedProcessFunction<String, Foo, Bar> {
private transient Counter counter;
@Override
public void processElement(Foo value, Context ctx, Collector<Bar> out) throws Exception {
if (counter == null) {
counter = getRuntimeContext().getMetricGroup().addGroup("key", value.getKey()).counter("my_counter"):
}
counter.inc()
out.collect(new Bar(foo));
}
}
但是我仍然得到:
Name collision: Group already contains a Metric with the name 'my_counter'. Metric will not be reported.
有什么我能做的吗?也许有更好的办法,我没找到?
我的想法是得到任何计数器已经存在,但我找不到一种方法来做到这一点,不是反射…
给定的KeyedProcessFunction
实例跨许多不同的键进行多路复用。当上下文中没有特定的键时,open
方法只被调用一次,因此您不能在那里创建每个键的指标。
Counter
——而不是每个KeyedProcessFunction
实例创建一个。其次,我不确定addGroup("key", value.getKey())
是否会在这里工作。如果你在解决第一个问题后仍然有问题,那么也许可以尝试addGroup("key").addGroup(value.getKey())
代替。
最后,如果你的键空间很大,特别是如果它是无界的,那么整个方法是有问题的。Flink的度量系统并不是为了扩展到大量的度量而设计的。