带有列表的 Apache flink 模式条件



我写了一个模式。我有一个条件列表(从 json 获取规则)。数据(json)即将从kafka服务器中出来。我想使用此列表过滤数据。但它不起作用。我该怎么做? 我不确定键控流和警报。眨眼可以这样工作吗?

主程序:

package cep_kafka_eample.cep_kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonParser;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema;
import util.AlarmPatterns;
import util.Rules;
import util.TypeProperties;
import java.io.FileReader;
import java.util.*;
public class MainClass {
public static void main( String[] args ) throws Exception
{
ObjectMapper mapper    = new ObjectMapper();
JsonParser parser = new JsonParser();
Object obj = parser.parse(new FileReader(
"c://new 5.json"));
JsonArray array = (JsonArray)obj;
Gson googleJson = new Gson();
List<Rules> ruleList = new ArrayList<>();
for(int i = 0; i< array.size() ; i++) {
Rules jsonObjList = googleJson.fromJson(array.get(i), Rules.class);
ruleList.add(jsonObjList);
}
//apache kafka properties
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
//starting flink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000).setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//get kafka values
FlinkKafkaConsumer010<ObjectNode> myConsumer = new FlinkKafkaConsumer010<>("demo", new JSONDeserializationSchema(),
properties);
List<Pattern<ObjectNode,?>> patternList = new ArrayList<>();
DataStream<ObjectNode> dataStream = env.addSource(myConsumer);
dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
DataStream<ObjectNode> keyedStream = dataStream;
//get pattern list, keyeddatastream
for(Rules rules : ruleList){
List<TypeProperties> typePropertiesList = rules.getTypePropList();
for (int i = 0; i < typePropertiesList.size(); i++) {
TypeProperties typeProperty = typePropertiesList.get(i);
if (typeProperty.getGroupType() != null && typeProperty.getGroupType().equals("group")) {
keyedStream = keyedStream.keyBy(
jsonNode -> jsonNode.get(typeProperty.getPropName().toString())
);
}
}
Pattern<ObjectNode,?> pattern = new AlarmPatterns().getAlarmPattern(rules);
patternList.add(pattern);
}
//CEP pattern and alarms
List<DataStream<Alert>> alertList = new ArrayList<>();
for(Pattern<ObjectNode,?> pattern : patternList){
PatternStream<ObjectNode> patternStream = CEP.pattern(keyedStream, pattern);
DataStream<Alert> alarms = patternStream.select(new PatternSelectFunction<ObjectNode, Alert>() {
private static final long serialVersionUID = 1L;
public Alert select(Map<String, List<ObjectNode>> map) throws Exception {
return new Alert("new message");
}
});
alertList.add(alarms);
}
env.execute("Flink CEP monitoring job");
}
}

getAlarmPattern:

package util;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class AlarmPatterns {
public Pattern<ObjectNode, ?> getAlarmPattern(Rules rules) {
//MySimpleConditions conditions = new MySimpleConditions();
Pattern<ObjectNode, ?>  alarmPattern = Pattern.<ObjectNode>begin("first")
.where(new IterativeCondition<ObjectNode>() {
@Override
public boolean filter(ObjectNode jsonNodes, Context<ObjectNode> context) throws Exception {
for (Criterias criterias : rules.getCriteriaList()) {
if (criterias.getCriteriaType().equals("equals")) {
return jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue());
} else if (criterias.getCriteriaType().equals("greaterThen")) {
if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
return false;
}
int count = 0;
for (ObjectNode node : context.getEventsForPattern("first")) {
count += node.get("value").asInt();
}
return Integer.compare(count, 5) > 0;
} else if (criterias.getCriteriaType().equals("lessThen")) {
if (!jsonNodes.get(criterias.getPropName()).equals(criterias.getCriteriaValue())) {
return false;
}
int count = 0;
for (ObjectNode node : context.getEventsForPattern("first")) {
count += node.get("value").asInt();
}
return Integer.compare(count, 5) < 0;
}
}
return false;
}
}).times(rules.getRuleCount());
return alarmPattern;
}
}

感谢您使用 FlinkCEP!

您能否提供有关错误消息(如果有)的更多详细信息?这将大大有助于解决问题。

从第一眼看代码,我可以做出以下观察:

起初,该行:

dataStream.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));

永远不会被执行,因为您永远不会在程序的其余部分使用此流。

其次,您应该指定在select()之后要采取的水槽,例如print()方法在每个PatternStream上。如果不这样做,则输出将被丢弃。您可以在此处查看示例,尽管该列表远非详尽无遗。

最后,我建议在模式中添加一个within()子句,这样您就不会耗尽内存。

错误来自我的 json 对象。我会修复它。当我在智能 cep 上运行作业不起作用时。当从 flink 控制台提交时,它可以工作。

相关内容

  • 没有找到相关文章

最新更新