为什么我在 flink 会话窗口中每个键都会得到多个窗口



我的要求是检查会话中的开始事件和成功事件。当然,我使用会话窗口。但似乎每个键都有重叠的窗口。我在网上搜索过,不知道为什么。

数据格式:myForm(timestamp, roomId, role, sessionId, event),例如:

myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844  
myform(1559129977, 456, kid, 38239, begin) # timestamp equals to 2019-05-29 19:39:37  
...

会话可能只有一对开始事件和成功事件,也可能有几对开始事件和成功事件。
活动可能会迟到,并允许迟到最多 3 分钟。

我的关键是roomId+role+sessionId,比如"123_kid_37890",seesionGap是 60 年代

// use event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = ... // from kafka, steam of myform
val sessionStream = stream
.assignTimestampsAndWatermarks(new MyFormEventWatermarks(0L))
.keyBy(mf => mf.roomId + "_" + mf.role + "_" + mf.sessionId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(60 * 1000L))      
.allowedLateness(Time.minutes(3))
.apply(myFormWindowFunction)
//MyFormEventWatermarks is :
class MyFormEventWatermarks[T <: AbstractForm](dely: Long) extends AssignerWithPeriodicWatermarks[T] {
private var currentMaxTimestamp = Long.MinValue
val maxOutOfOrderness = dely
@transient
var waterMark : Watermark = null
override def getCurrentWatermark: Watermark = {
if (currentMaxTimestamp == Long.MinValue){
waterMark = new Watermark(Long.MinValue)
waterMark
}
else{
waterMark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
waterMark
}
}
override def extractTimestamp(data: T, previousElementTimestamp: Long): Long = {
val timestamp = data.timestamp
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
timestamp
}

}

//window func is 
class myFormWindowFunction extends RichWindowFunction ... {
...
override def apply(key: String, window: TimeWindow, input: Iterable[myForm], out: Collector[List[myForm]]): Unit = {
println("window is " + window.getStart() + "-" + window.getEnd() + "|" + data.tostring)
}
...
}

myFormWindowFunction的方法apply中,println的结果如下:

// like this session data:
myform(1559128942, 123, kid, 37890, begin) # timestamp equals to 2019-05-29 19:22:22.605  
myform(1559128944, 123, kid, 37890, success) # timestamp equals to 2019-05-29 19:22:24.844  

我得到了一个2019-05-29 19:22:22.605- 2019-05-29 19:23:22.605窗口,数据myform(1559128942, 123, kid, 37890, begin),然后我得到了第二个2019-05-29 19:22:22.605 - 2019-05-29 19:23:24.844窗口,数据myform(1559128942, 123, kid, 37890, begin), myform(1559128944, 123, kid, 37890, success)。 它看起来像窗口初始化为 (2019-05-29 19:22:22.605, 2019-05-29 19:23:22.605) 和 (2019-05-29 19:22:24.844, 2019-05-29 19:23:24.844),并且onMerge方法合并但未"丢弃"窗口 (2019-05-29 19:22:22.605, 2019-05-29 19:23:22.605)。我已经查找了EventTimeSessionWindows的源函数和 flink 会话窗口的示例,但仍然不知道程序出了问题?

在允许的延迟间隔期间分配给窗口的事件的默认行为是在将每个延迟事件添加到窗口时触发窗口 - 但也可以实现一个自定义触发器,该触发器在允许的延迟到期时触发,而不是其他触发,或者作为其他触发的补充。

请注意,对于会话窗口,延迟到达的事件可能会导致延迟合并。

您可能需要考虑水印延迟和允许的延迟之间的权衡。由于水印延迟为零,因此可能会有相当多的延迟事件(每次事件流都按时间戳不完全按顺序排列)。相反,例如,如果您使用 3 分钟作为水印延迟并将允许延迟设置为零,那么您将产生相同的最终结果,但没有延迟触发和延迟合并 - 但在初始触发每个窗口之前有 3 分钟的延迟。

我发现了问题,我对 allowedLateness 产生了误解。使用时,窗口被保存,当窗口+允许迟到时间到达时,窗口将再次触发。

相关内容

  • 没有找到相关文章

最新更新