我惊讶地发现 Flink 中没有 DataStream
的外部连接(DataStream docs)。
对于DataSet
,您拥有所有选项: leftOuterJoin
、rightOuterJoin
和 fullOuterJoin
,除了常规join
(数据集文档)。但对于DataStream
来说,你只有普通的旧连接。
这是否是由于DataStream
的某些基本属性使得无法进行外连接?或者也许我们可以在(接近?)的未来期待这一点?
我真的可以在DataStream
上使用外部联接来解决我正在处理的问题......有没有办法实现类似的行为?
DataStream.coGroup()
转换实现外部联接。CoGroupFunction
接收两个迭代器(每个输入一个),它们为某个键的所有元素提供服务,如果未找到匹配的元素,则可能为空。这允许实现外部连接功能。
对外连接的一流支持可能会在 Flink 的下一个版本中添加到 DataStream API 中。我目前不知道有任何这样的努力。但是,在Apache Flink JIRA中创建问题可能会有所帮助。
是从流 -> 表 -> 流,使用以下 API: 弗林克表 API - 外部连接
下面是一个 java 示例:
DataStream<String> data = env.readTextFile( ... );
DataStream<String> data2Merge = env.readTextFile( ... );
...
tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");
String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";
Table tableLeft = tableEnv.sqlQuery(queryLeft);
Table tableRight = tableEnv.sqlQuery(queryRight);
Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);
下面是如何使用 Flink Table API 在 DataStreams 上执行 FULL OUTER JOIN 的完整工作示例。有关 DataStream API integration
的更多详细信息,请参阅 Flink 官方页面的表 API。
步骤
1. Convert both dataStreams into tables.
2. Registers tables as views to execute SQL query.
3. Execute full outer join SQL query on registered views.
4. Result of the SQL query will be a table.
5. Convert the result table to dataStream using toDataStream.
/**
* Two input data streams
*
* <p>@<code>
* 1. "Alice", "Bob", "John"
* 2. "Mike", "Sam", "Adam", "Alice"
* <p>
* The expected full outer join result is
* (John), (null)
* (Bob), (null)
* (Alice), (Alice)
* (null), (Mike)
* (null), (Sam)
* (null), (Adam)
* </code>
*/
public class DataStreamFullOuterJoinUsingTable {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
// Convert stream into a table and register it as a view object to execute SQL query.
DataStream<String> nameStream = streamEnv.fromElements("Alice", "Bob", "John");
Table nameTable = tableEnv.fromDataStream(nameStream).as("name");
tableEnv.createTemporaryView("nameTable", nameTable);
// Convert stream into a table and register it as a view object to execute SQL query.
DataStream<String> detailStream = streamEnv.fromElements("Mike", "Sam", "Adam", "Alice");
Table detailTable = tableEnv.fromDataStream(detailStream).as("detail");
tableEnv.createTemporaryView("detailTable", detailTable);
// Execute SQL full outer join SQL query and the result will be a table
Table result =
tableEnv.sqlQuery(
"SELECT * FROM "
+ "nameTable FULL OUTER JOIN detailTable "
+ "ON nameTable.name = detailTable.detail");
// Convert the result table to a dataStream and map the Row objects to String using map
DataStream<String> resultStream =
tableEnv
.toDataStream(result)
.map(
(MapFunction<Row, String>)
row -> "(" + row.getField(0) + "), (" + row.getField(1) + ")");
// print as a sink
resultStream.print();
/*
(John), (null)
(Bob), (null)
(Alice), (Alice)
(null), (Mike)
(null), (Sam)
(null), (Adam)
*/
streamEnv.execute();
}
}
除了 flink-streaming-java,你将需要以下 3 个依赖项(提供的范围)来让 DataStream 和 Table 协同工作。
1. flink-table-api-java
2. flink-table-api-java-bridge
3. flink-table-planner_2.12
一旦您知道如何将 dataStream 转换为表,执行 SQL 查询,然后将其转换回 dataStream,您就可以解决许多类似的问题。