我正在将Flink 1.9.1集群升级到Flink 1.15.0集群。这需要我在Flink Job代码中更新Flink api。
更新后的作业在Flink 1.15.0上运行良好,除了它没有发出任何应用程序级别的指标。
根据文档,度量标准注册如下:
private Counter numEvents = getRuntimeContext().getMetricGroup().counter("foo.bar.numEvents");
// and later
numEvents.inc();
// etc.
然后flink-metrics-statsd
插件处理它将度量发送到StatsD端点的部分。
我已经确认在StatsD端点上使用tcpdump正在发送系统级指标,但是我在代码中显式注册的指标无处可看。
相同的指标注册代码在旧的Flink 1.9.1运行良好,并发出所有这些指标。根据1.15.0文档,我的代码似乎是正确的。https://nightlies.apache.org/flink/flink - docs -释放- 1.15 -/- docs/ops/metrics/
在Flink日志中,我可以看到以下错误:
2022-07-19 16:50:07,975 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl [] - Error while reporting metrics
java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Double (java.lang.String and java.lang.Double are in module java.base of loader 'bootstrap')
at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27) ~[?:?]
at org.apache.flink.metrics.statsd.StatsDReporter.reportGauge(StatsDReporter.java:136) ~[?:?]
at org.apache.flink.metrics.statsd.StatsDReporter.report(StatsDReporter.java:106) ~[?:?]
at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:495) [flink-dist-1.15.0.jar:1.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
我不确定这是否有关系。
如果有人知道这里发生了什么,那就太好了。谢谢。🙏
你要么是这两种情况中的一种:
- https://issues.apache.org/jira/browse/FLINK-27487 -升级到最新的Flink 1.15(目前为Flink 1.15.1)应该解决这个问题。
- https://issues.apache.org/jira/browse/FLINK-28488如果你使用的是旧的flinkkafkconsumerer/FlinkKafkaProducer。这个问题尚未解决。