我有以下用例。
有一台机器正在向 Kafka 发送事件流,CEP engine
接收这些事件流,当满足流数据的条件时会生成警告。
FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<Event>(kafkaInputTopic, new EventDeserializationSchema(), properties);
DataStream<Event> eventStream = env.addSource(kafkaSource);
事件 POJO 包含 ID、名称、时间、IP。
机器会向Kafka发送大量数据,机器上有35个唯一的事件名称(如name1,name2.....name35),我想检测每个事件名称组合的模式(例如 name1 与 name2 共同出现,名称 1 与 name3.. 等)。我总共有 1225 种组合。
规则 POJO 包含 e1Name 和 e2Name。
List<Rule> ruleList -> It contains 1225 rules.
for (Rule rule : ruleList) {
Pattern<Event, ?> warningPattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE1Name())) {
return true;
}
return false;
}
}).followedBy("next").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
if(value.getName().equals(rule.getE2Name())) {
return true;
}
return false;
}
}).within(Time.seconds(30));
PatternStream patternStream = CEP.pattern(eventStream, warningPattern);
}
这是在一个流数据上执行多个模式的正确方法,还是有任何优化的方法来实现这一点。使用上述方法,我们会遇到PartitionNotFoundException
、UnknownTaskExecutorException
和内存问题。
IMO 你不需要模式来实现你的目标。您可以为源定义一个有状态映射函数,该函数将事件名称映射为对(最近两个名称)。之后,将源窗口设置为 30 秒,并将简单的字数统计示例应用于源。
有状态映射函数可以是这样的(只接受事件名称,你需要根据你的输入 -extract 事件名称等进行更改):
public class TupleMap implements MapFunction<String, Tuple2<String, Integer>>{
Tuple2<String, String> latestTuple = new Tuple2<String, String>();
public Tuple2<String, Integer> map(String value) throws Exception {
this.latestTuple.f0 = this.latestTuple.f1;
this.latestTuple.f1 = value;
return new Tuple2<String, Integer>(this.latestTuple.f0 + this.latestTuple.f1, 1);
}
}
并且具有事件名称对和发生次数的结果可以像这样获得(也许写入 Kafka 接收器?
DataStream<Tuple2<String, Integer>> source = stream.map(new TupleMap());
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.keyBy(0).timeWindow(Time.seconds(30)).sum(1);