我正在玩弗林克公制(1.7),我被困在这里:
我有一个flink的工作,他们处理用户变量,通过Prometheus Reporter公开几个自定义业务指标,例如:
- 客户_ID和国家和数据中心处理的Protobuf消息的数量。
- customer_id和country and type。
- 等。
目标是拥有这样的Prometheus数据:
<job>_<task>_<operator>_my_data_income{website_id="1",country="fr",dc="EUROPE"} 42
我所做的是:
public class MyMetricSink extends RichSinkFunction<AbstractASLogMetricEntity> {
@Override
public void invoke(AbstractASLogMetricEntity value, Context context) {
getRuntimeContext()
.getMetricGroup()
.addGroup( "website_id" , value.getCustomer() )
.addGroup( "country" , value.getCountry() )
.addGroup( "dc" , value.getDatacenter() )
.addGroup( "my_data" )
.counter( "income" )
.inc( value.getIncome() );
}
@Override
public void open(Configuration parameters) {
}
}
不工作 =>警告日志: "Name collision: Group already contains a Metric with the name..."
来自AbstractMetricGroup类。
为了使该工作正确,我需要先创建open
方法上的所有计数器。因此,我将拥有我的自定义指标的笛卡尔产品。可能是存储在哈希图上的1.2百万个计数器。
你们有什么建议可以避免这种情况吗?这是正确的方法吗?可以通过在某个地方使用工作来完成我想做的事情?
thx
这在某种程度上超出了Flink Metric System旨在支持的范围。一些尝试与许多指标一起跑步的人都报告了问题。
根据您最终试图完成的工作,您可能会考虑使用连接到Prometheus(或Grafana)的流入式水槽,或者可能会使用Elasticsearch Sink。也许可查询状态更适合。