我正在构建一个 Flink 流应用程序,并且更喜欢使用事件时间,因为它将确保设置的所有计时器都将在历史数据失败或重放时确定性地触发。事件时间的问题在于,只有当事件进入时,时间才会向前移动。我们的数据源(物理传感器(有时会生成很少的数据,因此有时单个数据点可能会打开一个 5 分钟的聚合窗口,但下一个数据点是在 20 分钟后,因此窗口会关闭并很晚地发出输出记录。
我们对此提出的解决方案是使用计划每 X 分钟运行一次的 AWS lambda 函数,该函数将虚拟事件输出到 Flink 从中读取的 Kinesis 流中,从而强制生成一个水印,从而将时间向前推进。
我担心的是,这只有在水印真正是全局的情况下才有效,这意味着单个心跳消息可能导致创建水印,该水印会提前使用源自此流的数据的 Flink 应用程序中每个运算符/任务的事件时间。文档让我相信 Flink 将来自源的读取与 Flink 相似,其中每个并行读取运算符生成自己的水印,然后下游运算符(例如窗口(获取它看到的各种水印的最小值。如果是这种情况,这对我来说似乎是有问题的,因为每个并行水印生成器都需要一个虚拟的心跳事件,但我无法控制哪些节点从流中读取我的心跳消息。
所以,我的问题是,下游算子究竟如何使用水印来推进事件时间,是否可以将单个虚拟消息添加到 kinesis 流中以在整个 Flink 应用程序中推进事件时间?
如果没有,我怎样才能强制事件时间向前推进?
你是对的;这里有一个问题。BoundedOutOfOrdernessTimestampExtractor
实现的标准定期水印生成器依赖于查看具有较大时间戳的新事件,以便推进水印。
有几种方法可以解决此问题:
-
在并行度为 1 的任务中运行源和水印分配器(如果需要,可以增加管道其余部分的并行度(。这样,一条检测信号消息就足够了。
-
广播检测信号消息。这样,每个并行实例都将收到它们,并且它们都可以推进其水印。
实现水印生成器, 而不是检测信号消息,该生成器使用处理时间计时器人为地推进水印,尽管缺少传入事件。有关示例,请参阅 https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java。
请注意,第三种方法不太理想,因为它创建了与处理时间的耦合,从而消除了纯事件时间方法的一些核心优势。
如果使用检测信号源,则需要为返回MAX_WATERMARK的另一个(有时是空闲的(源实现水印生成器。否则,此流中的水印将保留整体水印。
此外,AWS Lambda感觉有点矫枉过正。你可以实现一个简单的自定义 Flink 源来创建心跳事件。