Apache Flink动态管道



我正在努力创建一个框架,以允许客户为我的软件创建自己的插件,以构建Apache Flink。我在我试图工作的片段中概述了摘要(只是作为概念证明),但是在尝试上传时,我会遇到org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.错误。

我希望能够将输入流分配到x数量的不同管道中,然后将它们组合在一起成单个输出。我在下面拥有的只是我从我开始的简化版本。

public class ContentBase {
  public static void main(String[] args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kf-service:9092");
    properties.setProperty("group.id", "varnost-content");
    // Setup up execution environment and get stream from Kafka
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
                    new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
            .map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));

    // Create a new List of Streams, one for each "rule" that is being executed
    // For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
    List<String> codes = Arrays.asList("404", "200", "500");
    List<DataStream<ObjectNode>> outputs = new ArrayList<>();
    for (String code : codes) {
      outputs.add(MyClass.filter(logs, "response", code));
    }
    // It seemed as though I needed a seed DataStream to union all others on 
    ObjectMapper mapper = new ObjectMapper();
    ObjectNode seedObject = (ObjectNode) mapper.readTree("{"start":"true"");
    DataStream<ObjectNode> alerts = see.fromElements(seedObject);
    // Union the output of each "rule" above with the seed object to then output
    for (DataStream<ObjectNode> output : outputs) {
      alerts.union(output);
    }

    // Convert to string and sink to Kafka
    alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
            .addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
    see.execute();
  }
}

我不知道如何在flink Web界面中获取实际错误以在此处添加该信息

我发现了一些错误。

1)流执行环境只能有一个输入(显然是?我可能是错误的),因此添加.fromElements输入不好

2)我忘记了所有数据流都是不可变的

最终结果最终更简单

public class ContentBase {
  public static void main(String[] args) throws Exception {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "kf-service:9092");
    properties.setProperty("group.id", "varnost-content");
    // Setup up execution environment and get stream from Kafka
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<ObjectNode> logs = see.addSource(new FlinkKafkaConsumer011<>("log-input",
                    new JSONKeyValueDeserializationSchema(false), properties).setStartFromLatest())
            .map((MapFunction<ObjectNode, ObjectNode>) jsonNodes -> (ObjectNode) jsonNodes.get("value"));

    // Create a new List of Streams, one for each "rule" that is being executed
    // For now, I have a simple custom wrapper on flink's `.filter` function in `MyClass.filter`
    List<String> codes = Arrays.asList("404", "200", "500");
    List<DataStream<ObjectNode>> outputs = new ArrayList<>();
    for (String code : codes) {
      outputs.add(MyClass.filter(logs, "response", code));
    }
    Optional<DataStream<ObjectNode>> alerts = outputs.stream().reduce(DataStream::union);

    // Convert to string and sink to Kafka
    alerts.map((MapFunction<ObjectNode, String>) ObjectNode::toString)
            .addSink(new FlinkKafkaProducer011<>("kf-service:9092", "log-output", new SimpleStringSchema()));
    see.execute();
  }
}

由于最后一个部分代码(即转换为字符串),您发布的代码无法通过。您将Java流API map与Flink One混合。将其更改为

alerts.get().map(ObjectNode::toString);

可以修复它。

祝你好运。

相关内容

  • 没有找到相关文章

最新更新