我正在使用FlinkKafka对流应用规则。以下是示例代码:
ObjectMapper mapper = new ObjectMapper();
List<JsonNode> rulesList = null;
try {
// Read rule file
rulesList = mapper.readValue(new File("ruleFile"), new TypeReference<List<JsonNode>>(){});
} catch (IOException e1) {
System.out.println( "Error reading Rules file.");
System.exit(-1);
}
for (JsonNode jsonObject : rulesList) {
String id = (String) jsonObject.get("Id1").textValue();
// Form the pattern dynamically
Pattern<JsonNode, ?> pattern = null;
pattern = Pattern.<JsonNode>begin("start").where(new SimpleConditionImpl(jsonObject.get("rule1")));
// Create the pattern stream
PatternStream<JsonNode> patternStream = CEP.pattern(data, pattern);
}
但问题是,FlinkKafka 在我们启动程序时只读取文件一次,我希望新规则在运行时动态添加并应用于流。
有什么方法可以在 Flink Kafka 中实现这一点吗?
Flink 的 CEP 库(尚)不支持动态模式。(请参阅 FLINK-7129。
这样做的标准方法是使用广播状态在整个集群中通信和存储规则,但您必须想出一些方法来评估/执行规则。
有关示例,请参阅 https://training.da-platform.com/exercises/taxiQuery.html 和 https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/examples/datastream_java/broadcast/BroadcastState.java。