Flink Metric用户变量



我正在玩弗林克公制(1.7),我被困在这里:

我有一个flink的工作,他们处理用户变量,通过Prometheus Reporter公开几个自定义业务指标,例如:

  • 客户_ID和国家和数据中心处理的Protobuf消息的数量。
  • customer_id和country and type。
  • 等。

目标是拥有这样的Prometheus数据:

<job>_<task>_<operator>_my_data_income{website_id="1",country="fr",dc="EUROPE"} 42

我所做的是:

public class MyMetricSink extends RichSinkFunction<AbstractASLogMetricEntity> {
    @Override
    public void invoke(AbstractASLogMetricEntity value, Context context) {
        getRuntimeContext()
            .getMetricGroup()
            .addGroup(  "website_id" , value.getCustomer() )
            .addGroup( "country" , value.getCountry() )
            .addGroup(  "dc" , value.getDatacenter() )
            .addGroup( "my_data" )
            .counter( "income" )
            .inc( value.getIncome() );
     }
    @Override
    public void open(Configuration parameters) {       
    }
}

不工作 =>警告日志: "Name collision: Group already contains a Metric with the name..."来自AbstractMetricGroup类。

为了使该工作正确,我需要先创建open方法上的所有计数器。因此,我将拥有我的自定义指标的笛卡尔产品。可能是存储在哈希图上的1.2百万个计数器。

你们有什么建议可以避免这种情况吗?这是正确的方法吗?可以通过在某个地方使用工作来完成我想做的事情?

thx

这在某种程度上超出了Flink Metric System旨在支持的范围。一些尝试与许多指标一起跑步的人都报告了问题。

根据您最终试图完成的工作,您可能会考虑使用连接到Prometheus(或Grafana)的流入式水槽,或者可能会使用Elasticsearch Sink。也许可查询状态更适合。

相关内容

  • 没有找到相关文章

最新更新