如何查找Flink中连续事件之间的事件时差



我想找出每两个连续输入事件之间的事件时间差。如果时间差高于某个阈值,那么我想输出一个事件,表明阈值已被突破。我还希望流的第一个事件总是输出这个违反信号,作为它没有以前的事件来计算时间差的指示。

我尝试使用Flink的CEP库,因为它可以确保事件按事件时间排序。

我创建的模式如下:

Pattern.begin("begin").optional().next("end");

我使用optional()子句来处理第一个事件,因为我认为第一个事件将是"begin"没有值的唯一事件。

当我输入事件a1 a2 a3 a4 a5时,我得到以下输出匹配:

{a1} {a1 a2} {a2} {a2 a3} {a3} {a3 a4} {a4} {a4 a5}...

然而,我想要以下内容,因为它将允许我计算每个连续事件之间的时间差。

{a1} {a1 a2} {a2 a3} {a3 a4} {a4 a5}...

我尝试过使用不同的AfterMatchSkipStrategy设置以及IterativeCondition子句,但没有成功。

标记"开始";可选是导致不需要的匹配的原因。我会寻找其他方法来为第一个事件生成违反信号——例如,也许你可以准备一个虚拟的第一个事件。

另一种方法是只使用CEP或SQL对流进行排序,然后使用RichFlatMap或有状态流程函数来实现业务逻辑:即计算差异并生成违规信号。

请参阅我可以使用Flink CEP对流进行排序吗?了解如何做到这一点。

最新更新