我的问题是在JOIN
操作之后无法使用ORDER BY
子句。为了重现问题,
CREATE TABLE stack (
id INT PRIMARY KEY,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5',
'fields.id.kind'='sequence',
'fields.id.start'='1',
'fields.id.end'='100'
);
该表在ts
上具有水印策略和TIMESTAMP(3) *ROWTIME*
类型。
Flink SQL> DESC stack;
+------+------------------------+-------+---------+--------+----------------------------+
| name | type | null | key | extras | watermark |
+------+------------------------+-------+---------+--------+----------------------------+
| id | INT | FALSE | PRI(id) | | |
| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | `ts` - INTERVAL '1' SECOND |
+------+------------------------+-------+---------+--------+----------------------------+
2 rows in set
但是,如果我将视图定义为简单的自联接
CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r
ON l.id=r.id
);
它失去了水印策略,但没有类型,
Flink SQL> DESC self_join;
+------+------------------------+-------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+------------------------+-------+-----+--------+-----------+
| ts | TIMESTAMP(3) *ROWTIME* | TRUE | | | |
| id | INT | FALSE | | | |
| id0 | INT | FALSE | | | |
+------+------------------------+-------+-----+--------+-----------+
3 rows in set
我假设我们可以保留水印策略,并在JOIN
操作后使用ORDER BY
,但事实并非如此。如何将水印策略再次添加到VIEW
?
提前谢谢。
每当Flink SQL在流模式下执行常规联接(没有任何时间约束的联接(时,结果都不可能有水印。这反过来意味着你不能对结果进行排序或应用窗口化。
为什么会这样,你能做些什么?
背景
Flink SQL使用时间属性(在本例中为stack.ts
(来优化状态保持。因为stack
流/表有一个时间属性,所以我们知道这个流或多或少会按时间顺序进行处理(元素被限制为最多1秒无序(。这样就严格限制了必须保留多少状态才能执行像排序这个表这样的操作——1秒长的缓冲区就足够了。
如果stack
上没有定义时间属性(即,定义了水印的时间戳字段(,那么Flink SQL将拒绝对其进行排序(在流模式下(,因为这样做需要保持无限量的状态,并且不可能知道在发出第一个结果之前要等多久。
常规联接的结果不能具有定义良好的水印策略
任何类型的常规联接都要求Flink在其状态后端永远存储输入表的所有行(Flink愿意尝试这样做(。但更重要的是,水印并没有对结果进行定义,因为它可能有多乱没有限制
你能做什么
如果将联接修改为间隔联接或临时联接,则结果仍将带有水印。例如,您可以这样做:
CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r
ON l.id=r.id
WHERE ls.ts BETWEEN r.ts - INTERVAL '1' MINUTE AND r.ts
);
或者你可以这样做:
CREATE VIEW self_join AS (
SELECT l.ts, l.id, r.id
FROM stack as l INNER JOIN stack as r FOR SYSTEM_TIME AS OF r.ts
ON l.id=r.id
);
在这两种情况下,Flink的SQL引擎将能够比常规联接保持更少的状态,并且能够在输出流/表中生成水印。
另一种可能的解决方案是将结果表转换为DataStream,然后使用DataStream API应用水印,然后将该流转换回表。但只有当你有一些领域知识,让你知道结果流可能有多无序,并且你可能已经将相同的信息表达为区间或时间连接时,这才有意义。