Flink-如何在状态中聚合



我有一个keyd数据流,看起来像:

{
summary:Integer
uid:String
key:String
.....
}

我需要聚合某个时间范围内的摘要值,一旦我获得特定的编号,就可以将摘要和所有影响摘要的UID刷新到数据库/日志文件中。

在第一次刷新之后,我想丢弃内存中的所有uid,并立即刷新每个新项目。

所以我尝试了这个聚合函数。

public class AggFunc implements AggregateFunction<Item, Acc, Tuple2<Integer,List<String>>>{
private static final long serialVersionUID = 1L;
@Override
public Acc createAccumulator() {
return new Acc());
}
@Override
public Acc add(Item value, Acc accumulator) {
accumulator.inc(value.getSummary());
accumulator.addUid(value.getUid);
return accumulator;
}
@Override
public Tuple2<Integer,List<String>> getResult(Acc accumulator) {
List<String> newL = Lists.newArrayList(accumulator.getUids());
accumulator.setUids(Lists.newArrayList());
return Tuple2.of(accumulator.getSum(), newL);
}
@Override
public Acc merge(Acc a, Acc b) {
.....
}
}

在聚合过程函数中,我将列表刷新到状态,如果我需要保存到数据库,我将清除状态并保存状态中的标志以指示它。

但对我来说,这似乎是不正确的。我不确定这对我是否有效。

有更好的解决方案吗?

在富函数中处理状态。在您的状态和窗口触发刷新值时,继续添加uid。官方文件中的这一页有一个例子。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-键控状态

对于您的情况,ListState会很好地工作。

编辑:

以上解决方案适用于非窗口情况。对于窗口情况,只需使用具有应用函数的聚合,该函数可以具有丰富的窗口函数

相关内容

  • 没有找到相关文章

最新更新