Scala 中单个模式的 Flink CEP 迭代条件



我面临的问题是我无法在scala中对单个CEP模式执行求和。我想检测特定客户 ID 的总和何时大于 6100。我正在为 CEP.pattern(...( 提供一个键控流。我在下面提供了用于模式构建的代码。

val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore

我的输入是 avro 格式,Flink 正在从 kafka 消费它。输入如下所示 -:

{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```

但是,下面的代码在使用两种模式时运行良好:

val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore

getEventsForPattern返回模式已匹配的值。我们来分析一下客户27。处理事件时

{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}

您的第一个代码段拒绝此消息,因为它不满足条件:sum + amount = 0 + 6094 < 6100。加工时

{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}

您的条件将再次检查是否0 + 547 > 6100,这就是您看不到输出的原因。

在第二个示例中,您使用的是运算符followedBy这意味着您将处理元素对。第一笔交易被无条件接受(因为您不包括where运营商(,现在它将通过ctx.getEventsForPattern("start")调用返回。我希望您了解此代码的行为。


CEP主要用于发现数据中的模式,而不是聚合它们。您的问题可以通过执行窗口化然后过滤来解决 - 无需在此处使用CEP

最新更新