如何使用 Flink CEP 检测模式 a+b+



Flink CEP不适用于我的模式。 我有一个序列,例如aabbbbaaaabbabb(a+b+(。 我希望函数进程显示如下输出: {aabbbb}{aaaabb}{abb}

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<JsonNode, JsonNode> attemptPattern = Pattern.<JsonNode>begin("first", skipStrategy)
.where(new SPCondition() {
@Override
public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
return element.get("endpoint").textvalue().equals("A");
}
}).oneOrMore()
.next("second")
.where(new SPCondition() {
@Override
public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
return element.get("endpoint").textvalue().equals("B");
}
}).oneOrMore();

我的结果:

{aab} {aaaab} {ab}

你需要以某种方式坚持它需要它所能获得的所有 B,而不仅仅是在第一个之后匹配。这是一种方法。

public class CEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "b", "b", "a", "b", "b", "x");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
Pattern<String, String> pattern = Pattern.<String>begin("first", skipStrategy)
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (element.equals("a"));
}
}).oneOrMore()
.next("second")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (element.equals("b"));
}
}).oneOrMore()
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (!element.equals("b"));
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern);
patternStream.select(new SelectSegment()).print();
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("first")) + String.join("", pattern.get("second"));
}
}
}

相反,如果您想匹配 a+b*,虽然我觉得应该有一个更简单的解决方案,但这里有一些可行的方法:

public class CEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "x");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
Pattern<String, String> pattern = Pattern.<String>begin("a-or-b", skipStrategy)
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.equals("a") || element.equals("b");
}
}).oneOrMore()
.next("end")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String element, Context<String> ctx) throws Exception {
List<String> list = new ArrayList<>();
ctx.getEventsForPattern("a-or-b").iterator().forEachRemaining(list::add);
int length = list.size();
if (!element.equals("a") && !element.equals("b")) return true;
return (((length >= 1) && element.equals("a") && list.get(length - 1).equals("b")));
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern);
patternStream.select(new SelectSegment()).print();
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a-or-b"));
}
}
}

就其价值而言,我通常发现match_recognize提供了一个更直接的DSL来与Flink进行模式匹配。

相关内容

  • 没有找到相关文章

最新更新