这个函数(ProcessElement(的作用非常清楚:基于键控流(由rideId键控(,它将迭代rideId属于该键的所有元素,它将根据条件更新状态
override def processElement(ride: TaxiRide,
context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
out: Collector[TaxiRide]): Unit = {
val timerService = context.timerService
if (ride.isStart) {
// the matching END might have arrived first; don't overwrite it
if (rideState.value() == null) {
rideState.update(ride)
}
}
else {
rideState.update(ride)
}
timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
}
一旦水印到达时间戳,计时器就会触发
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
out: Collector[TaxiRide]): Unit = {
val savedRide = rideState.value
if (savedRide != null && savedRide.isStart) {
out.collect(savedRide)
}
rideState.clear()
}
问题是:如果End记录先出现,然后根据逻辑,它不会更新骑行状态(相关密钥(,然后它会在2小时后触发,然后它不会收集也不会发出记录,但如果这个记录符合我们的要求怎么办?===记录的开始时间发生在2个多小时前?我认为应该有更多的逻辑来处理
如果END记录是在START记录之前处理的,那么START记录可能到达得很晚,当它到达时,它提供了这段旅程持续了两个多小时的证据。
然而,这项练习的目标并不是找到所有持续两个多小时的游乐设施,而是实时标记那些本应在现在结束但尚未结束的游乐设施。既然你询问的这些游乐设施已经结束,它们是否值得提醒还有待商榷。
你提出了一个有趣的观点,可能应该添加到练习讨论页面中。