用于 CEP 的简单 Scala API 示例不显示任何输出



我正在编写一个简单的示例,用于在Flink中测试CEP的新Scala API,使用最新的Github版本1.1-SNAPSHOT.

Pattern只是对一个值的检查,并为每个匹配的模式输出一个字符串作为结果。代码如下:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()

它在1.1-SNAPSHOT下编译和运行没有问题,但jobmanager输出没有显示print()的迹象。即使是放松模式条件,只设置一个"开始"(接受所有事件)也绝对没有任何回报。

此外,当尝试添加阶段时,代码无法编译。如果我将模式更改为(查找第三个字段小于4的两个连续事件):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

然后编译器抛出:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))

在"开始"阶段之后的第一个where()上显示错误。我试着用显式设置参数类型

(x: (String, Long, Int)) => x._3 < 4

这样,它会再次编译,但当它在Flink上运行时,不会显示任何输出。StreamingAlert是一个Scala数据流[(String,Long,Int)],在代码的其他部分,我可以使用_._ < 4进行过滤,而不会出现问题,并且输出似乎是正确的。

流式API中的print()API调用不会触发急切执行。您仍然需要在程序结束时调用env.execute()

当您定义您的模式时,您应该在某个地方提供事件类型。要么你按照你已经做的那样做,要么你通过begin:的类型参数来做

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

相关内容

  • 没有找到相关文章

最新更新