Apache Flink Table API Insert statement



我有一个Flink应用程序,处理来自2个流的数据。我正在使用一个表API,我想从一个流1中消费数据,并查询另一个流2,并获得最新时间戳的记录-

我现在有这个了-

def insert_into_output(output_table_name, event_table_name, code_table_name):
return """
INSERT INTO {0} (ip, sn, code, timestamp)
SELECT DISTINCT
ip, sn, code, timestamp
FROM {2} WHERE
sn =
(SELECT 
sn
FROM {1}
WHERE timestamp = 
(SELECT MAX(timestamp) FROM {1}))
""".format(output_table_name, event_table_name, code_table_name)

不幸的是,我得到一个错误陈述-doesn't support consuming update and delete changes which is produced by node GroupAggregate(groupBy=[ip, sn, code, timestamp], select=[ip, sn, code, timestamp])。什么好主意吗?

使用MAX(TIMESTAMP)的SQL查询的结果意味着结果可以不断更改,因为现在的时间戳可能比5分钟前的时间戳高。因此,该SQL语句的结果是一个收回流。你可以在表到流转换文档

你正在发送到Kinesis,但它不支持收回流,只支持追加流

最新更新