我想找出每两个连续输入事件之间的事件时间差。如果时间差高于某个阈值,那么我想输出一个事件,表明阈值已被突破。我还希望流的第一个事件总是输出这个违反信号,作为它没有以前的事件来计算时间差的指示。
我尝试使用Flink的CEP库,因为它可以确保事件按事件时间排序。
我创建的模式如下:
Pattern.begin("begin").optional().next("end");
我使用optional()
子句来处理第一个事件,因为我认为第一个事件将是"begin"
没有值的唯一事件。
当我输入事件a1 a2 a3 a4 a5
时,我得到以下输出匹配:
{a1} {a1 a2} {a2} {a2 a3} {a3} {a3 a4} {a4} {a4 a5}...
然而,我想要以下内容,因为它将允许我计算每个连续事件之间的时间差。
{a1} {a1 a2} {a2 a3} {a3 a4} {a4 a5}...
我尝试过使用不同的AfterMatchSkipStrategy
设置以及IterativeCondition
子句,但没有成功。
标记"开始";可选是导致不需要的匹配的原因。我会寻找其他方法来为第一个事件生成违反信号——例如,也许你可以准备一个虚拟的第一个事件。
另一种方法是只使用CEP或SQL对流进行排序,然后使用RichFlatMap或有状态流程函数来实现业务逻辑:即计算差异并生成违规信号。
请参阅我可以使用Flink CEP对流进行排序吗?了解如何做到这一点。