编写一个基于flink cep对事件进行分组的模式



我们使用flink cep作为一个独立的库来查找事件列表中的模式。

给定以下事件列表:

val patientKey = "patient"
val hrKey = "hr"
// Event
val p1e1 = Event("hr", mapOf(patientKey to 1, hr to 1))
val p1e2 = Event("hr", mapOf(patientKey to 1, hr to 2))
val p2e1 = Event("hr", mapOf(patientKey to 2, hr to 1))
val p1e3 = Event("hr", mapOf(patientKey to 1, hr to 3))
val p2e2 = Event("hr", mapOf(patientKey to 2, hr to 2))
val p3e1 = Event("hr", mapOf(patientKey to 3, hr to 1))
val p2e3 = Event("hr", mapOf(patientKey to 2, hr to 3))
val p3e2 = Event("hr", mapOf(patientKey to 3, hr to 2))
val p3e3 = Event("hr", mapOf(patientKey to 3, hr to 3))

我们想写一个匹配返回的模式:

第一个匹配:p1e1、p1e2、p1e3

第二次匹配:p2e1、p2e2、p2e3

第三匹配:p3e1、p3e2、p3e3

因此,在有键控流的flink环境中运行CEP似乎是可行的,但如果没有键控流,我们如何做到这一点呢。我们无法部署完整的flink env,因为我们在受限制的设备上运行。

我们希望在5秒内采集患者的所有心率。

感谢

您可以通过将键控约束放入模式定义中来获得对流设置键控的效果。如果您使用SQL:,请执行类似操作

PATTERN (A B C) WITHIN INTERVAL '5' SECOND
DEFINE
A AS A.hr = 1
B AS B.patientKey = A.patientKey AND B.hr = 2
C AS C.patientKey = B.patientKey AND C.hr = 3

如果不使用SQL,同样的逻辑也适用。(对于邻接性,您需要指定followedBy而不是next,因为您不会按patientKey对流进行分区。(

就其价值而言,我想不出避免键控流会带来任何操作或性能方面的好处。(事实上,CEP总是使用键控状态,即使您没有明确使用键控流。(键控流的使用使使用更大的Flink集群并并行操作成为可能,但并不需要。

最新更新