如何停止Apache Flink CEP模式



请帮帮我,我有两个问题:
我从Apache Kafka json消息中读取,(然后我有步骤:反序列化到POJO、filter、keyBy……(

  1. KeyedProcessFunction(带状态、定时器、if-else逻辑块(和Flink CEP模式库哪个更好

我可以在KeyedProcessFunction中检查输入序列(检查状态,如果其他块,out.collect(…(,state.clear((…你会理解我的(,我也可以使用带有条件和量化器的Flink CEP库

  1. 如何停止投掷CEP图案

例如:
我有输入序列:A1、(1分钟内无事件(A2、(5分钟内无活动(А3、(1 min内无活动。(A1和A5之间可能有很多事件(
我想发送输出:A1、A3、A5
第一个事件,如果下一个事件发生在上一个事件之后不到5分钟,则不会发送到输出;如果下一事件发生在前一事件之后超过5分钟,将发送到输出
我应该向我的模式添加什么

Pattern<Event, ?> pattern = Pattern.
<Event>begin("start")
.where(new SimpleCondition<Event>(){
public boolean filter(Event event){
return event.getName().contains("A");
}
}).within(Time.minutes(5));

虽然乍一看,这个特定的例子作为KeyedProcessFunction实现起来似乎很琐碎,但如果消息可能无序到达,肯定会产生一些复杂性。然后你可能会被愚弄,以为会有很大的差距,而事实上并没有。

然而,如果您想要一个简单、开箱即用的现成解决方案,那么这个特定的示例非常适合会话窗口。

有了CEP,我认为一个有效的解决方案会有这样的味道:你正在寻找一个a(称为A1(的序列,然后是另一个a的序列(称为A2(,其中(A2.timestamp-A1.timestamp(>=5分钟。当发现匹配时,发射A1并推进匹配引擎,使A2成为新的A1。(方便的是,CEP会对输入流进行预排序,这样您就不必担心事情会出错。(

最新更新