闪烁流媒体事件时间窗口排序



我在理解事件时间窗口的语义时遇到了一些问题。下面的程序生成一些带有时间戳的元组,用作事件时间,并进行简单的窗口聚合。我希望输出与输入的顺序相同,但输出的顺序不同。为什么输出相对于事件时间不正常?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)
    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))
    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()
    env.execute()
}

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)

这种行为的原因是在Flink中没有考虑元素的顺序(相对于时间戳)。对于考虑时间的操作,只有水印的正确性及其与元素的时间戳的关系才是重要的,因为水印通常会在基于时间的操作中触发计算。

在您的示例中,窗口操作符将源中的所有元素存储在内部窗口缓冲区中。然后,源发出一个水印,表示将来不会有时间戳较小的元素到达。这反过来又告诉窗口操作员处理具有低于水印的结束时间戳的所有窗口(对于所有窗口都是这样)。因此,它发出所有窗口(具有任意顺序),然后它自己发出水印。这之后的操作本身将接收元素,并且一旦接收到水印就可以进行处理。

默认情况下,从源发出水印的间隔为200毫秒。对于源发出的少量元素,所有这些元素都在发出第一个水印之前发出。在真实世界的用例中,水印发射间隔比窗口大小小得多,您可以按照时间戳的顺序获得正在发射的窗口的预期行为。例如,如果每500毫秒有一个小时的窗口和水印。

最新更新