如何连接计数窗口和时间窗口?



我正在尝试设置一个首先执行countWindow的流。countWindow 发出的结果需要传递到不同的 timeWindow。问题是时间窗口没有发出任何结果。

我想出了一个非常简单的代码来演示这个问题:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env
.addSource(new RichSourceFunction[Int] {
override def cancel(): Unit = {}
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
var i = 0
while (true) {
println(s"Source emits element ${i}")
ctx.collect(i)
i = i + 1
Thread.sleep(1000)
}
}
})
.keyBy(new KeySelector[Int, String] {
override def getKey(value: Int): String = {
println("getKey 1")
"KEY1"
}
})
.countWindow(2, 1)
.reduce(new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
println("reduce 1")
value1
}
})
.keyBy(new KeySelector[Int, String] {
override def getKey(value: Int): String = {
println("getKey 2")
"KEY2"
}
})
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
println("reduce 2")
value1
}
})
.print()

使用上面的代码,我希望每 5 秒打印一个元素输出一次。然而,情况并非如此。实际输出显示"打印"功能仅达到一次:

Source emits element 0
getKey 1
getKey 2
getKey 2
1> 0
Source emits element 1
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 2
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 3
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 4
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 5
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 6
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 7
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 8
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 9
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 10
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 11
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2
Source emits element 12
getKey 1
getKey 1
reduce 1
getKey 2
getKey 2

有趣的例子。如果从"引入时间"更改为"处理时间",则示例将正常运行。

在调试器中环顾四周,我看到的是,使用 IngestionTime,CountWindow 生成的 StreamRecords 不再具有有效的时间戳,因此 TimeWindow 无法正常运行。

要解决此问题,您需要在 CountWindow 之后重新建立时间戳和水印,如下所示:

...
.countWindow(2, 1)
.reduce(new ReduceFunction[Int] {
override def reduce(value1: Int, value2: Int): Int = {
println("reduce 1")
value1
}
})
.assignTimestampsAndWatermarks(new IngestionTimeExtractor[Int]())
.keyBy(new KeySelector[Int, String] {
override def getKey(value: Int): String = {
println("getKey 2")
"KEY2"
}
})
...

类似的技术也适用于事件时间。

相关内容

  • 没有找到相关文章

最新更新