我正在努力创建一个框架,以允许客户为我的软件创建自己的插件,以构建Apache Flink。我在我试图工作的片段中概述了摘要(只是作为概念证明),但是在尝试上传时,我会遇到org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
错误。
我希望能够将输入流分配到x
数量的不同管道中,然后将它们组合在一起成单个输出。我在下面拥有的只是我从我开始的简化版本。
public class ContentBase {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kf-service:9092");
properties.setProperty("group.id", "varnost-content");
// Setup up execution environment and get stream from Kafka
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
.map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));
// Create a new List of Streams, one for each "rule" that is being executed
// For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
List<String> codes = Arrays.asList("404", "200", "500");
List<DataStream<ObjectNode>> outputs = new ArrayList<>();
for (String code : codes) {
outputs.add(MyClass.filter(logs, "response", code));
}
// It seemed as though I needed a seed DataStream to union all others on
ObjectMapper mapper = new ObjectMapper();
ObjectNode seedObject = (ObjectNode) mapper.readTree("{"start":"true"");
DataStream<ObjectNode> alerts = see.fromElements(seedObject);
// Union the output of each "rule" above with the seed object to then output
for (DataStream<ObjectNode> output : outputs) {
alerts.union(output);
}
// Convert to string and sink to Kafka
alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
.addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
see.execute();
}
}
我不知道如何在flink Web界面中获取实际错误以在此处添加该信息
我发现了一些错误。
1)流执行环境只能有一个输入(显然是?我可能是错误的),因此添加
.fromElements
输入不好
2)我忘记了所有数据流都是不可变的
最终结果最终更简单
public class ContentBase {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kf-service:9092");
properties.setProperty("group.id", "varnost-content");
// Setup up execution environment and get stream from Kafka
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
.map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));
// Create a new List of Streams, one for each "rule" that is being executed
// For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
List<String> codes = Arrays.asList("404", "200", "500");
List<DataStream<ObjectNode>> outputs = new ArrayList<>();
for (String code : codes) {
outputs.add(MyClass.filter(logs, "response", code));
}
Optional<DataStream<ObjectNode>> alerts = outputs.stream().reduce(DataStream::union);
// Convert to string and sink to Kafka
alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
.addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
see.execute();
}
}
由于最后一个部分代码(即转换为字符串),您发布的代码无法通过。您将Java流API map
与Flink One混合。将其更改为
alerts.get().map(ObjectNode::toString);
可以修复它。
祝你好运。