//创建一个包含十个项目的窗口
WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);
应用窗口函数,添加一些自定义评估窗口中的所有值
DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out)
//custom evaluation logic
out.collect(new ObservationEvent(1,"temperature", "stable"));
}
});
定义简单的 CEP 模式
Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first")
.subtype(ObservationEvent.class)
.where(new FilterFunction<ObservationEvent>() {
@Override
public boolean filter(ObservationEvent arg0) throws Exception {
System.out.println( arg0 ); //This function is not at all called
return false;
}
});
PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);
当我运行此代码时,根本不调用 where 子句中的过滤器函数。我已经打印了inactivityStream.print(),我可以看到匹配的值。
现在,当我直接插入输入流而不应用窗口时。模式匹配
我打印了inputStream和WindowedStream,我可以看到它们都发送类似的数据。
我错过了什么
FilterFunction 最终应该被调用,但您必须等待 10 个事件来获取相同的键,然后才能看到第一次调用 FilterFunction。 难道您只是在窗口测试中等待的时间不够长吗?
请记住,如果您有许多唯一键,这意味着您必须在窗口测试中等待 10 倍以上的时间,然后才能看到调用的过滤器函数。