我尝试像这样聚合两个流
val joinedStream = finishResultStream.keyBy(_.searchId)
.connect(startResultStream.keyBy(_.searchId))
.process(new SomeCoProcessFunction)
然后在这样的课堂上SomeCoProcessFunction
研究它们
class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {
override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
// aggregating some "finished" data ...
}
override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
val timerService = ctx.timerService()
timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)
// aggregating some "created" data ...
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {
val watermark: Long = ctx.timerService().currentWatermark()
println(s"watermark!!!! $watermark")
// clean up the state
}
我想要的是在特定时间(5000毫秒(后清理状态,这就是onTimer
必须使用的。但是由于它从未被解雇,我有点问自己,我在这里做错了什么?
提前感谢您的任何提示。
更新:
解决方案是像这样设置时间服务(tnx对Fabian-Hueske和Beckham(:
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)
我仍然没有真正弄清楚timerService.registerEventTimeTimer
做什么,水印ctx.timerService().currentWatermark()
显示总是-9223372036854775808
现在在 EventTimer 注册之前多久很重要。
我看到您正在使用的System.currentTimeMillis
可能与您的 Flink 作业使用的TimeCharacteristic
(事件时间、处理时间、摄取时间(不同。
尝试获取事件的时间戳ctx.timestamp()
然后在其上添加 5000 毫秒。
问题是您正在注册一个具有处理时间时间戳 ( System.currentTimeMillis + 5000
( 的事件时间计时器 ( timerService.registerEventTimeTimer
(。
System.currentTimeMillis
返回当前计算机时间,但事件时间不是基于计算机时间,而是基于从水印计算的时间。
应注册处理计时器或注册具有事件时间时间戳的事件时间计时器。您可以从作为参数传递给 processElement1()
和 processElement2()
的 Context
对象中获取当前水印的时间戳或当前记录的时间戳。