作为flink CEP文档中的描述:
Strict Contiguity:
期望所有匹配的事件严格一个接一个地出现,中间没有任何不匹配的事件。Relaxed Contiguity
:忽略在匹配事件之间出现的非匹配事件。Non-Deterministic Relaxed Contiguity
:进一步放松连续性,允许额外的匹配忽略一些匹配事件。
第一个例子很容易理解:
给出模式:"a b"
和输入"a", "c", "b1", "b2"
- 严格连续输出:
{} (no match)
- 松散邻接输出:
{a b1}
- 非确定性松弛邻接输出:
{a b1}, {a b2}
但是Contiguity within looping patterns
的例子真的很难理解:
给定模式:"a b+ c"
.
和输入:"a", "b1", "d1", "b2", "d2", "b3" "c"
- 严格连续:{a b3 c}
- 松弛连续度:{a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}
- 非确定性松弛连续:{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 b2 c}, {a b2 b3 c}, {a b3 c}
the Strict Contiguity output {a b3 c}
,但这与Strict Contiguity
中的描述相反,因为在a
和b3
之间有许多non-matching events
。
我相信你是对的。在严格的连续性下,它根本不匹配。我写了下面的例子来确保:
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> events = env.fromElements("a", "b1", "d1", "b2", "d2", "b3", "c");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<String, String> pattern =
Pattern.<String>begin("a", skipStrategy)
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("a");
}
})
.next("b+")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("b");
}
})
.oneOrMore().consecutive()
.next("c")
.where(
new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.startsWith("c");
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern).inProcessingTime();
patternStream.select(new SelectSegment()).addSink(new PrintSinkFunction<>(true));
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a"))
+ String.join("", pattern.get("b+"))
+ String.join("", pattern.get("c"));
}
}
}
我已经创建了FLINK-27456来跟踪这个。