假设我有一个数据流
x:1, y:2 , z:3 , x:7 , y:-1, z:0, z:3 , z:2, y:3 ,x: 2 ,y:6
如何将x,y,z
放入自己的存储桶中并对其应用我的 CEP 规则。
x:1, x:7,x: 2
y:2, y:-1, y:3 , y:6
z:3, z:0 , z:3, z:2
或者换一种说法。如何将流拆分为这些类别(每个 x,y,z 一个流)。我会得到 3 个子流,它们有自己的 CEP 处理。
这里的挑战是,x,y,z不是预定义的。所以我不能预先创建流并使用 if 或 switch 语句进行分配。
编辑:模式类似于 ," 如果 x 值在过去 10 分钟内介于 0 - 8 之间
这是通过在类别属性上"键入"流来完成的。
如果您有DataStream[(String, Int)]
,则如下所示:
val yourStream: DataStream[(String, Int)] = ???
val yourPattern: Pattern = ???
// key by String attribute
val keyedStream = yourStream.keyBy(_._1)
// apply pattern on keyed stream
val patternStream: PatternStream = CEP.pattern(keyedStream, yourPattern)
将针对键控属性的每个不同值评估模式。