在WindowAssigner
中,一个元素被分配给一个或多个TimeWindow
实例。在滑动事件时间窗口的情况下,这发生在SlidingEventTimeWindows#assignWindows
1。
在具有size=5
和slide=1
的窗口的情况下,具有timestamp=0
的元素被分配到以下窗口:
- 窗口(开始= 0,结束= 5)
- 窗口(开始= 1,结束= 4)
- 窗口(开始= 2,结束= 3)
- 窗口(开始= 3,结束= 2)
- 窗口(开始= 4,结束= 1)
在一张图中:
+-> Beginning of time
|
|
+----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[-1,4[ Window 2 XXXXX |
| t=[-2,3[ Window 3 XXXXX |
| t=[-3,2[ Window 4 XXXXX |
| t=[-4,1[ Window 5 XXXXX |
| |
| time(-4 to +4) ---- |
| 432101234 |
+---------------------------+------------------+
|
|
|
+
有没有办法告诉Flink有一个时间的开始,在此之前,没有窗口?如果不是,从哪里开始寻求改变呢?在上面的例子中,Flink应该只有一个用于第一个元素的窗口(t=[4,8[ Window 1
)。这样的:
+-> Beginning of time
|
|
+-----------------------------------------------+
| size = 5 +--+ element |
| slide = 1 | |
| v |
| t=[ 0,5[ Window 1 XXXXX |
| t=[ 1,6[ Window 2 XXXXX |
| t=[ 2,7[ Window 3 XXXXX |
| t=[ 3,8[ Window 4 XXXXX |
| t=[ 4,9[ Window 5 XXXXX |
| |
| time(-4 to +8) ---- |
| 4321012345678 |
+---------------------------+-------------------+
|
|
|
+
一旦窗口数量达到并超过窗口大小,这将不再起作用。那么,在上面的例子中,所有的元素都在5个Windows内。
脚注:
-
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows#assignWindows
目前没有办法指定Flink作业的有效时间间隔。考虑到您可能还希望将作业应用于历史数据,这也可能会有一点问题。
你可以做的是,手动过滤在超时开始前开始的窗口:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val startTime = 1
val windowLength = 2
val slide = 1
val input = env.fromElements((1,1), (2,2), (3,3))
.assignAscendingTimestamps(x => x._2)
val windowed = input
.timeWindowAll(Time.milliseconds(windowLength), Time.milliseconds(slide))
.apply{ (window, iterable, collector: Collector[Int]) =>
if (window.getStart >= startTime) {
collector.collect(iterable.map(_._1).reduce(_ + _))
} else {
// discard early windows
}
}
windowed.print()
env.execute()
以下是AssignerWithPeriodicWatermarks[T]
的概念证明:
class WMG[T](wait: Long) extends AssignerWithPeriodicWatermarks[T] {
var t: Option[Long] = None
var firstTime = true
override def extractTimestamp(el: T, prevTs: Long): Long = {
t = Some(prevTs)
prevTs
}
override def getCurrentWatermark(): Watermark = (t, firstTime) match {
case (None, _) => return null
case (Some(v), false) => new Watermark(v)
case (Some(v), true) => {
firstTime = false
new Watermark(v + wait)
}
}
}
Update:不幸的是,它不起作用(现在我不知道为什么要这样),在键流中总是有几个键与"早期窗口"。所以最后我只是过滤错误的窗口,如:
val s = (winSize/winStep).intValue
kstream.flatMapWithState((in: StreamOut, state: Option[Int]) =>
state match {
case None => (Seq(), Some(1))
case Some(s) => (Seq(in), Some(s))
case Some(v) => (Seq(), Some(v+1))
})