我有一个流输入,比如股票价格数据(包括多只股票),我想每 1 分钟按它们的价格进行一次排名。排名基于所有股票的最新价格,无论是否在前 1 分钟内更新,都需要对所有股票进行排序。我尝试在 flink 流 SQL 中使用 ORDER BY。
我未能实现我的逻辑,我对两个部分感到困惑:
-
为什么
ORDER BY
只能使用时间属性作为主要属性,而只能支持ASC
?如何按价格等其他类型执行订单? -
下面的 SQL(来自 Flink 文档)是什么意思?没有窗口,也没有窗口,所以我假设每个订单都会立即执行 SQL,在这种情况下,对一个元素进行排序看起来毫无意义。
[更新]:当我阅读 ProcimeSortProcessFunction.scala 的代码时,似乎 Flink 对接下来一毫秒内收到的元素进行了排序。
SELECT *
FROM Orders
ORDER BY orderTime
最后,有没有办法在SQL中实现我的逻辑?
流式查询中的ORDER BY
很难计算,因为当我们必须发出需要转到结果表开头的结果时,我们不想更新整个结果。因此,我们仅在可以保证结果具有(大致)增加的时间戳时才支持ORDER BY time-attribute
。
将来(Flink 1.6 或更高版本),我们还将支持一些查询,例如ORDER BY x ASC LIMIT 10
,这将导致一个更新表,其中包含具有 10 个最小x
值的记录。
无论如何,您无法(轻松)使用GROUP BY
翻转窗口计算每分钟的 top-k 排名。GROUP BY
查询将组的记录(在GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE)
的情况下也是窗口)聚合到单个记录中。因此,每分钟不会有多个记录,而只有一个。
如果您希望查询计算每分钟字段a
的前 10 名,则需要类似于以下内容的查询:
SELECT a, b, c
FROM (
SELECT
a, b, c,
RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank
FROM yourTable)
WHERE rank <= 10
但是,Flink(版本 1.4)尚不支持此类查询,因为 time 属性用于PARTITION BY
子句,而不是OVER
窗口的ORDER BY
子句。