Apache Flink - 序列化 json 并执行连接操作



我正在尝试使用杰克逊库从Kafka主题读取字符串并从另一个流中执行连接。

下面是包含两个数据流的示例代码。我想对这些消息流执行加入操作。

例如,传入流是:

messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}

联接条件messageStream1."A" = messageStream2."B" 。如何在 Flink 中实现这一点?

数据流 1:

DataStream<String> messageStream1 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

数据流 2:

DataStream<String> messageStream2 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

你需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供一个自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys(。

因此,map(...)的返回类型可能是Tuple2<String,JsonNode>(如果String是连接属性的正确类型(。

然后,您可以按照文档 (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html( 中所述指定联接:

messageStream1.join(messageStream2)
    .where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
    .apply(new JoinFunction() {...});

要使用 API 执行联接DataStream还需要指定联接窗口。只能联接属于同一窗口的元组。

相关内容

  • 没有找到相关文章

最新更新