使用 apache flink 进行数据聚合



我正在尝试使用 flink 数据流 API 创建一个实时数字计数器。但是我面临着一些问题来实现解决方案。

例: 数据负载

{
"room": 1,  # Room Number
"numbers": [101, 111, 201, 211, 13, ....], # Only these numbers in output with count
"my_number": 401  # My Current Number according to room
}

只有4间客房1,2,3和4,my_number将根据房间而有所不同。 这是我传递给 flink 的流数据。

问题陈述: 我想根据房间计算数字,并在输出中只想返回数组数字及其计数。每个房间都是一样的。

output example:
[
{
101: 2,
111: 5,
201: 1
.
.
.
}
]

如果我理解正确,你可以这样做:

dataPayloadSource.keyby("room").process(new CountNumbers()).flatMap(new MapDataPayloadToCorrespondObject())addSink(...);
// ...
public class CountNumbers extends KeyedProcessFunction<..>{
private MapState<Integer, Integer> numberCountState;

public void open(Configuration config){
// initialize state in here
}

public void processElement(DataPayload dp){
// for each numbers in the dp.counts, get the state value with numberCountState.get(..)
// check it returns null, if yes, map does not have that key, initialize with 1
// if not null, then get the current value from the map, increment by 1
// update the mapstate
} 
}
// ...
public class MapDataPayloadToCorrespondObject extends RichFlatMapFunction<...>{
public void flatMap(...){
// convert DataPayload to OutputObject
}
}

相关内容

  • 没有找到相关文章

最新更新