在Flink SQL中加入连续查询



我试图加入两个连续的查询,但一直遇到以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.nPlease check the documentation for the set of currently supported SQL features.

以下是表格定义:

CREATE TABLE `Combined` (
`machineID` STRING,
`cycleID` BIGINT,
`start` TIMESTAMP(3),
`end` TIMESTAMP(3),
WATERMARK FOR `end` AS `end` - INTERVAL '5' SECOND,
`sensor1` FLOAT,
`sensor2` FLOAT
)

和插入查询

INSERT INTO `Combined` 
SELECT
a.`MachineID`,
a.`cycleID`,
MAX(a.`start`) `start`,
MAX(a.`end`) `end`,
MAX(a.`sensor1`) `sensor1`,
MAX(m.`sensor2`) `sensor2`
FROM `Aggregated` a, `MachineStatus` m
WHERE 
a.`MachineID` = m.`MachineID` AND 
a.`cycleID` = m.`cycleID` AND 
a.`start` = m.`timestamp`
GROUP BY a.`MachineID`, a.`cycleID`, SESSION(a.`start`, INTERVAL '1' SECOND)

在源表AggregatedMachineStatus中,starttimestamp列是具有水印的时间属性。

我尝试过将联接的输入行强制转换为时间戳,但这并没有解决问题,这意味着我不能使用SESSION,它应该确保每个周期只记录一个数据点。

非常感谢您的帮助!

我对此进行了进一步的研究,并注意到GROUP BY语句在该上下文中没有意义。

此外,SESSION可以用时间窗口代替,这是更惯用的方法。

INSERT INTO `Combined` 
SELECT
a.`MachineID`,
a.`cycleID`,
a.`start`,
a.`end`,
a.`sensor1`,
m.`sensor2`
FROM `Aggregated` a, `MachineStatus` m
WHERE 
a.`MachineID` = m.`MachineID` AND 
a.`cycleID` = m.`cycleID` AND 
m.`timestamp` BETWEEN a.`start` AND a.`start` + INTERVAL '0' SECOND

为了理解连接动态表的不同方法,我发现VervericaSQL培训非常有用。

相关内容

  • 没有找到相关文章

最新更新