我需要一些关于ApacheFlink中timerService的帮助。我的用例非常简单,但我找不到任何明确的答案。
我的程序从一个源(在我的例子中是rabbitMQ(接收json格式的事件(映射到MyEvent,这里简化了MyEvent(。要么立即处理事件(1(,要么存储以供后续处理(2(。(2( 我认为TimerService是一个合适的解决方案。在onTimer方法中,我需要整个对象(MyEvent(,而不仅仅是键。因此,首先我认为使用整个json作为密钥,这很好,但不知何故,这感觉不对,因为在任何示例中都没有这样使用密钥。第二种方法如下;使用ValueState。但我的密钥不是唯一的,ValueState是每个密钥的。再一次,我可以用一个物体作为钥匙。。。在onTimer中,我只能访问ctx.currentKey中事件的密钥,而不能访问事件本身。。。
所以我的问题是:无论物体看起来是什么样子,我如何才能将整个事件存储起来,以便稍后进行处理?
这是代码(kotlin(
data class MyEvent(val event: String, val secs: Int)
class CountWithTimeoutFunction : KeyedProcessFunction<String, MyEvent, MyEvent>() {
private lateinit var state: ValueState<MyEvent>
override fun open(parameters: Configuration?) {
state = runtimeContext.getState(ValueStateDescriptor("myState", MyEvent::class.java))
}
override fun processElement(myEvent: MyEvent, context: Context, collector: Collector<MyEvent>) {
println("" + Date() + "-processElement-" + myEvent)
state.update(myEvent)
context.timerService().registerProcessingTimeTimer(context.timerService().currentProcessingTime() + myEvent.secs * 1000)
}
override fun onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector<MyEvent>) {
val myEvent = state.value()
println("" + Date() + "-onTimer-" + ctx.currentKey + " state.value()_" + myEvent)
}
}
谢谢你的建议。
对于这种情况,一种常见而直接的技术是通过向用随机数填充的事件添加一个字段,为每个事件提供一个唯一的密钥。(请注意,由于Flink依赖于具有确定性的密钥,因此执行keyBy(random.nextLong())
将不起作用。(
有时使用的另一种技术是使用MapState
,其中键是计时器时间戳,值是等待该计时器的事件列表。当事件到达时,将它们附加到列表中作为时间戳。当计时器启动时,处理列表中的所有内容,然后将其丢弃。
第二种方法将使用更少的定时器,但可能效率较低(至少在RocksDB状态后端(,因为处理这些列表会带来开销。