我想通过processKeyedFunction实现aggregationFunction,因为默认的aggregationFunction不支持富函数,此外,我尝试了aggregationFunction+processWindowFunction(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html),但它也不能满足我的需求,所以我不得不使用基本的processKeyedFunction来实现aggregationFunction,我的问题细节如下:
在processFunction中,我为stage定义了一个windowState。元素的聚合值,代码如下:
public void open(Configuration parameters) throws Exception {
followCacheMap = FollowSet.getInstance();
windowState = getRuntimeContext().getMapState(windowStateDescriptor);
currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>(
"timer",
Long.class
));
在processElement((函数中,我使用windowState(这是一个在open函数中启动的MapState(来聚合窗口元素,并注册第一次Servie来清除当前窗口状态,代码如下:
@Override
public void processElement(FollowData value, Context ctx, Collector<FollowData> out) throws Exception
{
if ( (currentTimer==null || (currentTimer.value() ==null) || (long)currentTimer.value()==0 ) && value.getClickTime() != null) {
currentTimer.update(value.getClickTime() + interval);
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
}
windowState = doMyAggregation(value);
}
在onTimer((函数中,首先,我在接下来的一分钟内注册下一个timeService,并清除窗口State
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FollowData> out) throws Exception {
currentTimer.update(timestamp + interval); // interval is 1 minute
ctx.timerService().registerEventTimeTimer((long)currentTimer.value());
out.collect(windowState);
windowState.clear();
}
但当程序运行时,我发现onTimer中的所有windowState都是空的,但在processElement((函数中却没有empyt,我不知道为什么会发生这种情况,也许执行逻辑不同,我该如何解决这个问题,提前感谢!
关于doMyAggregation((部分的新添加代码
windowState是MapState,key是";mykey";,值是自定义的Object AggregateFollow
public class AggregateFollow {
private String clicked;
private String unionid;
private ArrayList allFollows;
private int enterCnt;
private Long clickTime;
}
doMyAggregation(value(函数很像这样,doMyAg胶gation的函数是获取源字段为"follow"的所有值,但如果在1分钟内没有字段为"click"的值,则"follow(跟随("值应该是过时的,总之,这就像"follow’数据和"click’数据的联接操作,
AggregateFollow acc = windowState.get(windowkey);
String flag = acc.getClicked();
ArrayList<FollowData> followDataList = acc.getAllFollows();
if ("0".equals(flag)) {
if ("follow".equals(value.getSource())) {
followDataList.add(value);
acc.setAllFollows(followDataList);
}
if ("click".equals(value.getSource())) {
String unionid = value.getUnionid();
clickTime = value.getClickTime();
if (followDataList.size() > 0) {
ArrayList listNew = new ArrayList();
for (FollowData followData : followDataList) {
followData.setUnionid(unionid);
followData.setClickTime(clickTime);
followData.setSource("joined_flag"); //
}
acc.setAllFollows(listNew);
}
acc.setClicked("1");
acc.setUnionid(unionid);
acc.setClickTime(clickTime);
windowState.put(windowkey, acc);
}
} else if ("1".equals(flag)) {
if ("follow".equals(value.getSource())) {
value.setUnionid(acc.getUnionid());
value.setClickTime(acc.getClickTime());
value.setSource("joined_flag");
followDataList.add(value);
acc.setAllFollows(followDataList);
windowState.put(windowkey, acc);
}
}
由于性能问题,最初的windowAPI对我来说不是一个有效的选择,我认为这里唯一的方法是使用processFunction+ontimer和Guava Cache,非常感谢
如果windowState
为空,查看doMyAggregation(value)
在做什么会很有帮助。
如果没有更多的上下文,很难对此进行调试或提出好的替代方案,但out.collect(windowState)
不会按预期工作。相反,您可能想做的是迭代这个MapState
,并将它包含的每个键/值对收集到输出中。
我把windowState的类型从MapState改为ValueState,问题就解决了,也许是bug什么的,有人能解释吗?