我正在尝试使用杰克逊库从Kafka主题读取字符串并从另一个流中执行连接。
下面是包含两个数据流的示例代码。我想对这些消息流执行加入操作。
例如,传入流是:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
联接条件messageStream1."A" = messageStream2."B"
。如何在 Flink 中实现这一点?
数据流 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
数据流 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
你需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供一个自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys(。
因此,map(...)
的返回类型可能是Tuple2<String,JsonNode>
(如果String
是连接属性的正确类型(。
然后,您可以按照文档 (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html( 中所述指定联接:
messageStream1.join(messageStream2)
.where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
.apply(new JoinFunction() {...});
要使用 API 执行联接DataStream
还需要指定联接窗口。只能联接属于同一窗口的元组。