Apache Flink:窗口函数和时间的开始



WindowAssigner中,一个元素被分配给一个或多个TimeWindow实例。在滑动事件时间窗口的情况下,这发生在SlidingEventTimeWindows#assignWindows 1

在具有size=5slide=1的窗口的情况下,具有timestamp=0的元素被分配到以下窗口:

  1. 窗口(开始= 0,结束= 5)
  2. 窗口(开始= 1,结束= 4)
  3. 窗口(开始= 2,结束= 3)
  4. 窗口(开始= 3,结束= 2)
  5. 窗口(开始= 4,结束= 1)

在一张图中:

                            +-> Beginning of time
                            |
                            |
+----------------------------------------------+
|     size = 5              +--+ element       |
|    slide = 1              |                  |
|                           v                  |
| t=[ 0,5[ Window 1         XXXXX              |
| t=[-1,4[ Window 2        XXXXX               |
| t=[-2,3[ Window 3       XXXXX                |
| t=[-3,2[ Window 4      XXXXX                 |
| t=[-4,1[ Window 5     XXXXX                  |
|                                              |
| time(-4 to +4)        ----                   |
|                       432101234              |
+---------------------------+------------------+
                            |
                            |
                            |
                            +

有没有办法告诉Flink有一个时间的开始,在此之前,没有窗口?如果不是,从哪里开始寻求改变呢?在上面的例子中,Flink应该只有一个用于第一个元素的窗口(t=[4,8[ Window 1)。这样的:

                            +-> Beginning of time
                            |
                            |
+-----------------------------------------------+
|     size = 5              +--+ element        |
|    slide = 1              |                   |
|                           v                   |
| t=[ 0,5[ Window 1         XXXXX               |
| t=[ 1,6[ Window 2          XXXXX              |
| t=[ 2,7[ Window 3           XXXXX             |
| t=[ 3,8[ Window 4            XXXXX            |
| t=[ 4,9[ Window 5             XXXXX           |
|                                               |
| time(-4 to +8)        ----                    |
|                       4321012345678           |
+---------------------------+-------------------+
                            |
                            |
                            |
                            +

一旦窗口数量达到并超过窗口大小,这将不再起作用。那么,在上面的例子中,所有的元素都在5个Windows内。


脚注:

  1. org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows

目前没有办法指定Flink作业的有效时间间隔。考虑到您可能还希望将作业应用于历史数据,这也可能会有一点问题。

你可以做的是,手动过滤在超时开始前开始的窗口:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
               .assignAscendingTimestamps(x => x._2)
val windowed = input
      .timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
      .apply{ (window, iterable, collector: Collector[Int]) =>
         if (window.getStart >= startTime) {
           collector.collect(iterable.map(_._1).reduce(_ + _))
         } else {
           // discard early windows
         }
       }
windowed.print()
env.execute()

我可能会找到更好的解决方法。这个想法是将水印设置到未来足够远的点,这样就会有足够的数据用于您的窗口。早期的窗口仍然存在,但它们将被丢弃。

以下是AssignerWithPeriodicWatermarks[T]的概念证明:

  class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
    var t: Option[Long] = None
    var firstTime = true
    override def extractTimestamp(el: T, prevTs: Long): Long = {
      t = Some(prevTs)
      prevTs
    }
    override def getCurrentWatermark(): Watermark = (t, firstTime) match {
      case (None, _) => return null
      case (Some(v), false) => new Watermark(v)
      case (Some(v), true) => {
        firstTime = false
        new Watermark(v + wait)
      }
    }
  }
"等待"是第一个窗口的大小。似乎可以正常工作,但我不太了解flink。

Update:不幸的是,它不起作用(现在我不知道为什么要这样),在键流中总是有几个键与"早期窗口"。所以最后我只是过滤错误的窗口,如:

val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>      
  state match {
    case None    => (Seq(), Some(1))
    case Some(s) => (Seq(in), Some(s))
    case Some(v) => (Seq(), Some(v+1))
  })

最新更新