我写了一篇 Flink CEP 文章,用Relaxed Contiguity
(followedBy
( 检查状态模式(由id
键控(。这个想法是在指定时间内第一个状态之后未到达特定状态时引发警报。
这有效,但是,如果此流中没有更多消息,则永远不会触发警报。但只有当具有某种随机状态的消息到达时,才会触发这一部分。
那么,当具有下一个序列的消息没有到达时间时,即使没有消息到达此流,如何使其触发警报?
Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
.where(new SimpleCondition<Transaction>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_1);
}
})
.followedBy("end")
.where(new SimpleCondition<Transaction>() {
@Override
public boolean filter(Transaction value) throws Exception {
return value.getStatus().equals(Status.STATUS_2);
return amlveri;
}
}).within(Time.seconds(15));
PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
TypeInformation.of(Alert.class)) {};
SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
new PatternFlatTimeoutFunction<Transaction, Alert>() {
@Override
public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
throws Exception {
Transaction failedTrans = values.get("start").get(0);
arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
}
}, new PatternFlatSelectFunction<Transaction, Alert>() {
@Override
public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
throws Exception {
// do not do anything
}
});
select.getSideOutput(timedOutPartialMatchesTag).print();
如果您正在使用事件时间,则within()
方法正在等待水印触发事件时间计时器。但是由于没有事件到达,水印不会前进(假设您使用类似 BoundedOutOfOrdernessTimestampExtractor 的东西来生成水印(。
如果您需要在没有事件到达的情况下检测时间的流逝,则有必要使用处理时间。您可以将TimeCharacteristic
设置为处理时间,也可以实现一个水印生成器,该生成器使用处理时间计时器在缺少事件的情况下人为地推进水印。