Flink 的 CoProcessFunction 不会触发 onTimer



我尝试像这样聚合两个流

val joinedStream = finishResultStream.keyBy(_.searchId)
  .connect(startResultStream.keyBy(_.searchId))
  .process(new SomeCoProcessFunction)

然后在这样的课堂上SomeCoProcessFunction研究它们

class SomeCoProcessFunction extends CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated] {
   override def processElement1(finished: SearchFinished, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = { 
       // aggregating some "finished" data ...
   }
   override def processElement2(created: SearchCreated, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#Context, out: Collector[SearchAggregated]): Unit = {
       val timerService = ctx.timerService()
       timerService.registerEventTimeTimer(System.currentTimeMillis + 5000)
       // aggregating some "created" data ...
   }
   override def onTimer(timestamp: Long, ctx: CoProcessFunction[SearchFinished, SearchCreated, SearchAggregated]#OnTimerContext, out: Collector[SearchAggregated]): Unit = {
       val watermark: Long = ctx.timerService().currentWatermark()
       println(s"watermark!!!! $watermark")
       // clean up the state
   }

我想要的是在特定时间(5000毫秒(后清理状态,这就是onTimer必须使用的。但是由于它从未被解雇,我有点问自己,我在这里做错了什么?

提前感谢您的任何提示。

更新:

解决方案是像这样设置时间服务(tnx对Fabian-Hueske和Beckham(:

timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5000)

我仍然没有真正弄清楚timerService.registerEventTimeTimer做什么,水印ctx.timerService().currentWatermark()显示总是-9223372036854775808现在在 EventTimer 注册之前多久很重要。

我看到您正在使用的System.currentTimeMillis可能与您的 Flink 作业使用的TimeCharacteristic(事件时间、处理时间、摄取时间(不同。

尝试获取事件的时间戳ctx.timestamp()然后在其上添加 5000 毫秒。

问题是您正在注册一个具有处理时间时间戳 ( System.currentTimeMillis + 5000 ( 的事件时间计时器 ( timerService.registerEventTimeTimer (。

System.currentTimeMillis返回当前计算机时间,但事件时间不是基于计算机时间,而是基于从水印计算的时间。

应注册处理计时器或注册具有事件时间时间戳的事件时间计时器。您可以从作为参数传递给 processElement1()processElement2()Context 对象中获取当前水印的时间戳或当前记录的时间戳。

相关内容

  • 没有找到相关文章

最新更新