Flink Streaming窗口触发器



我有flink流,我在某个时间窗口上刻有几件事。

在这里发生了什么,它给我带来了我以前的整个窗口。

说前30秒我得到结果10。

我想要新的结果,而是要获得最后的窗口结果 新的结果等等。

所以我的问题是我如何为每个窗口获得新的结果。

您需要使用清除触发器。您想要的是fire_and_purge(发出和删除窗口内容(,默认的flink触发器的作用是fire(emit和保留窗口内容(。

input
    .keyBy(...)
    .timeWindow(Time.seconds(30))
    // The important part: Replace the default non-purging ProcessingTimeTrigger
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger))
    .reduce(...)

更深入的解释可以查看触发器和fire vs fire_and_purge。

触发器确定何时准备通过窗口函数处理窗口(由窗口分配程序形成(。每个WindowSAssigner都带有一个默认的触发器。如果默认触发器不符合您的需求,则可以使用触发器(...(指定自定义触发器。

当触发发射时,它可以发射或fire_and_purge。fire 保留窗口的内容,而fire_and_purge 删除其内容。默认情况下,预先实现的触发器只是在不清除窗口状态的情况下发射。

您描述的功能可以在翻滚窗口中找到:https://ci.apache.org/projects/flink/flink/flink/flink-docs-release-1.2/dev/windows.html#翻滚窗户

更多细节和/或代码将有所帮助:(

我迟到了这个问题,但是我遇到了同样的问题。我后来发现的是我自己的代码中的一个错误。仅供参考,我的错误可能是您的问题的很好的参考。

// Old code (modified to be an example):
val tenSecondGrouping: DataStream[MyCustomGrouping] = userIdsStream
      .keyBy(_.somePartitionedKey)
      .window(TumblingProcessingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
      .trigger(ProcessingTimeTrigger.create())
      .aggregate(new MyCustomAggregateFunc(new MyCustomGrouping()))

错误发生在新的mycustomGrouping :我无意间创建了一个单旋mycustomgrouping对象,并在mycustomaggregatefund中重复使用。随着越来越多的窗户的创建,后来的聚合结果变得疯狂了!修复程序是每次触发mycustomaggregatefunc时都会创建新的mycustomgroup。所以:

// New code, problem solved
          ...
          .aggregate(new MyCustomAggregateFunc(() => new MyCustomGrouping())) 
// passing in a func to create new object per trigger

相关内容

  • 没有找到相关文章

最新更新