如何为以下内容制作模式和警报



我有这段代码,它给出了位置 ID 和 temp,我想要一种在温度> THRESHOLD_TEMPERATURE时发出警报的模式

我试过了:-

val pattern1: Pattern[Event,_] = Pattern.begin[Event]("first")
.subtype(Event.getClass)
.where( (evt -> evt.getTemp()) >= TEMPERATURE_THRESHOLD)
.within(Time.seconds(5))
val patternStream: PatternStream[Event] = CEP.pattern(f,pattern1)
val alerts: DataStream[String] = patternStream.flatSelect(
(in: Map[String,String], out: Collector[String]) => {
var first: String = in.get("first")
if (first >= TEMPERATURE_THRESHOLD){
out.collect("Temperature above danger zone")
}
}
)

这是要发出警报的代码:-

case class Event(locationID: String, temp: Double)
val TEMPERATURE_THRESHOLD: Double = 50.00
val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("bootstrap.servers", "localhost:9092")

val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
new JSONKeyValueDeserializationSchema(false), properties))

var ask = src.map{
r => r.get("value")
}

var data = ask.map { v => {
val loc = v.get("locationID").asInstanceOf[String]
val temperature = v.get("temp").asDouble()
(loc, temperature)
}}
//    data.print()
var f = data.keyBy(
v => v._2
)
f.print()
see.execute()

模式变得过载,也是平坦的选择

CEP用于检测事件序列中的模式,这并不适合这个特定问题。查找临时>阈值不需要模式匹配的事件 - 一个简单的过滤器或平面图就可以完成这项工作。例如,您可以有一个平面地图,它将温度过高的每个事件转换为警报,并忽略所有其他事件。

至于您基于 CEP 的解决方案,我不明白您说的是哪里出了问题。但有几件事看起来不对劲。within子句不会做任何事情:由于您的模式只有一个事件长,因此它们没有任何持续时间。你正在通过温度来抠流,这似乎很奇怪。

相关内容

  • 没有找到相关文章