我有flink流,我在某个时间窗口上刻有几件事。
在这里发生了什么,它给我带来了我以前的整个窗口。
说前30秒我得到结果10。
我想要新的结果,而是要获得最后的窗口结果 新的结果等等。
所以我的问题是我如何为每个窗口获得新的结果。
您需要使用清除触发器。您想要的是fire_and_purge(发出和删除窗口内容(,默认的flink触发器的作用是fire(emit和保留窗口内容(。
input
.keyBy(...)
.timeWindow(Time.seconds(30))
// The important part: Replace the default non-purging ProcessingTimeTrigger
.trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger))
.reduce(...)
更深入的解释可以查看触发器和fire vs fire_and_purge。
触发器确定何时准备通过窗口函数处理窗口(由窗口分配程序形成(。每个WindowSAssigner都带有一个默认的触发器。如果默认触发器不符合您的需求,则可以使用触发器(...(指定自定义触发器。
当触发发射时,它可以发射或fire_and_purge。fire 保留窗口的内容,而fire_and_purge 删除其内容。默认情况下,预先实现的触发器只是在不清除窗口状态的情况下发射。
您描述的功能可以在翻滚窗口中找到:https://ci.apache.org/projects/flink/flink/flink/flink-docs-release-1.2/dev/windows.html#翻滚窗户
更多细节和/或代码将有所帮助:(
我迟到了这个问题,但是我遇到了同样的问题。我后来发现的是我自己的代码中的一个错误。仅供参考,我的错误可能是您的问题的很好的参考。
// Old code (modified to be an example):
val tenSecondGrouping: DataStream[MyCustomGrouping] = userIdsStream
.keyBy(_.somePartitionedKey)
.window(TumblingProcessingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
.trigger(ProcessingTimeTrigger.create())
.aggregate(new MyCustomAggregateFunc(new MyCustomGrouping()))
错误发生在新的mycustomGrouping :我无意间创建了一个单旋mycustomgrouping对象,并在mycustomaggregatefund中重复使用。随着越来越多的窗户的创建,后来的聚合结果变得疯狂了!修复程序是每次触发mycustomaggregatefunc时都会创建新的mycustomgroup。所以:
// New code, problem solved
...
.aggregate(new MyCustomAggregateFunc(() => new MyCustomGrouping()))
// passing in a func to create new object per trigger