我想使用 Flink Table API 连接同一字段上的两个表。
我想实现
SELECT
a.id
b.id
FROM
table1 AS a
JOIN
table2 AS b
ON
a.id = b.id
我试过了,但发现实现目标的唯一方法是
val table1 = tableEnv.fromDataSet(dbData, "id1")
val table2 = tableEnv.fromDataSet(dbData, "id2")
val res = table1.join(table2).where("id1=id2")
但我想重用键"id"。
我在 Flink 文档中找到了这个:
两个表必须具有不同的字段名称,并且必须使用 where 或 filter 运算符定义相等连接谓词。
如何重复使用归档的密钥?
联接结果的架构是两个输入架构的串联。 例如,如果您left: [a, b, c]
并right: [d, e, f]
left
和right
联接的结果是具有架构[a, b, c, d, e, f]
的新Table
。
如果left
和right
具有同名的字段,则后续操作(如select
(无法在没有歧义的情况下标识字段。
所以基本上
val left: Table = ??? // [id, valLeft]
val right: Table = ??? // [id, valRight]
val result: Table = left.join(right).where('id === 'id)
等效于 SQL 查询
SELECT l.id AS id, l.valLeft, r.id AS id, r.valRight
FROM left l, right r
WHERE l.id = r.id
此 SQL 查询也不会被接受,因为结果中有两个名为id
的字段。
您可以通过以下任一方式解决问题
- 仅重命名其中一个输入字段并重复使用另一个输入字段 重命名
- 两个输入字段,并在与
as
连接后重命名其中一个