我在以下 URL 中得到了 CEP 的示例 https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/cep/LongRides.java
"并且"此练习的目标是为出租车骑行发出 START 事件,这些事件在骑行的前 2 小时内未与 END 事件匹配。 但是,从下面的代码来看,似乎有一种模式来查找游乐设施已在 2 小时内完成,而不是在2 小时内未完成。
看起来模式首先找到开始事件,然后找到结束事件(!ride.isStart(,并且在 2 小时内,所以它不是解释为在 2 小时内找到游乐设施已完成的模式吗?
Pattern<TaxiRide, TaxiRide> completedRides =
Pattern.<TaxiRide>begin("start")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return ride.isStart;
}
})
.next("end")
.where(new SimpleCondition<TaxiRide>() {
@Override
public boolean filter(TaxiRide ride) throws Exception {
return !ride.isStart;
}
});
// We want to find rides that have NOT been completed within 120 minutes
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));
我改进了示例解决方案中的注释,使其更清晰。
// We want to find rides that have NOT been completed within 120 minutes.
// This pattern matches rides that ARE completed.
// Below we will ignore rides that match this pattern, and emit those that timeout.
PatternStream<TaxiRide> patternStream = CEP.pattern(keyedRides, completedRides.within(Time.minutes(120)));
OutputTag<TaxiRide> timedout = new OutputTag<TaxiRide>("timedout"){};
SingleOutputStreamOperator<TaxiRide> longRides = patternStream.flatSelect(
timedout,
new TaxiRideTimedOut<TaxiRide>(),
new FlatSelectNothing<TaxiRide>()
);