我有一个简单的Flink应用程序,试图检测从下面的文本文件创建的事件流的模式:
1,A
2,B
3,C
4,A
5,C
6,B
7,D
8,D
9,A
10,D
我这样定义模式:
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
,并使用patternStream执行模式检测。按如下方式处理:
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
和使用patternStream。flatSelect this way:
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
但是结果"事件流只包含第一个匹配的模式,而不是全部。在这两种情况下,输出文件只包含这一行:
4:A-B-A(1-2-4)
我在模式定义的末尾使用了oneOrMore(),但结果是:
4:A-B-A(1-2-4)
4:A-B-A(1-2-4)
我期望过程或选择函数选择(a - b - followwedby - a)的所有可能组合,它们是:
4:A-B-A(1-2-4)
4:A-B-A(1-2-9)
另外,如果我在输入文件的第6行之前再添加一行,并添加"6, A":
1,A
2,B
3,C
4,A
5,C
6,A
7,B
8,D
9,D
10,A
11,D
的结果是:
10:A-B-A(6-7-10)
4:A-B-A(1-2-4)
这意味着它将在找到第一个匹配后从头开始模式匹配。我如何解决这个问题?
我的完整代码是这样的:
package org.example;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.File;
import java.io.FileInputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class EventStreamCEP {
public static List<DataEvent> originalStream = new ArrayList<>();
public static List<DataEvent> complexEvents = new ArrayList<>();
public static void main(String[] args) throws Exception {
// Set up the Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Define the input data format
TextInputFormat inputFormat = new TextInputFormat(new Path("/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt"));
// read the input data from a file
DataStream<DataEvent> eventStream = env.readFile(inputFormat, "/home/majidlotfian/flink/flink-quickstart/PLprivacy/input_folder/input.txt")
.map(new MapFunction<String, DataEvent>() {
@Override
public DataEvent map(String value) throws Exception {
// Parse the line into an event object
String[] fields = value.split(",");
long timestamp = Integer.parseInt(fields[0]);
String type = fields[1];
DataEvent event = new DataEvent(timestamp,type);
//event.setTimestamp(timestamp);
return event;
}
})
// Assign timestamps and watermarks
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<DataEvent>() {
private long currentMaxTimestamp;
private final long maxOutOfOrderness = 10000; // 10 seconds
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(DataEvent element, long previousElementTimestamp) {
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
return timestamp;
}
});
// Define a pattern to detect events in the stream
Pattern<DataEvent, ?> pattern = Pattern.<DataEvent>begin("start")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
}).next("middle")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("B");
}
}).followedBy("end")
.where(new SimpleCondition<DataEvent>() {
@Override
public boolean filter(DataEvent dataEvent) throws Exception {
return dataEvent.getType().equals("A");
}
});
//pattern.oneOrMore();
// Create a pattern stream using the defined pattern
PatternStream<DataEvent> patternStream = CEP.pattern(eventStream, pattern);
/*
DataStream<DataEvent> result = patternStream.process(new PatternProcessFunction<DataEvent, DataEvent>() {
@Override
public void processMatch(Map<String, List<DataEvent>> map, Context context, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent( endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"));
}
});
*/
// Use PatternFlatSelectFunction to get all matched patterns
DataStream<DataEvent> result = patternStream.flatSelect(
new PatternFlatSelectFunction<DataEvent, DataEvent>() {
@Override
public void flatSelect(Map<String, List<DataEvent>> map, Collector<DataEvent> collector) throws Exception {
DataEvent startEvent = map.get("start").get(0);
DataEvent middleEvent = map.get("middle").get(0);
DataEvent endEvent = map.get("end").get(0);
collector.collect(new DataEvent(
endEvent.getTimestamp(),
startEvent.getType()+"-"+ middleEvent.getType()+"-"+ endEvent.getType() + "("+startEvent.getTimestamp()+"-" +middleEvent.getTimestamp()+"-" +endEvent.getTimestamp()+")"
));
}
}
);
// print the windowed event stream
result.print();
// write the matched patterns to a text file
String outputPath = "/home/majidlotfian/flink/flink-quickstart/PLprivacy/output_folder/output.txt";
result.map(new MapFunction<DataEvent, String>() {
@Override
public String map(DataEvent value) throws Exception {
return value.getTimestamp()+":"+value.getType();
}
})
.writeAsText(outputPath, FileSystem.WriteMode.OVERWRITE)
.setParallelism(1); // ensure that events are written in order
env.execute("EventStreamCEP");
}
为了使4:A-B-A(1-2-9)
匹配,我认为您需要使用followedByAny
而不是followedBy
。following -by-any放宽了连续性要求,并允许忽略匹配事件的匹配。
这很危险。内部状态机将无法停止查找进一步的匹配,并且不会释放部分匹配。您需要找到某种方法来约束匹配引擎,例如,通过指定within
子句,或通过向模式添加更受约束的终端节点。