我有一个Flink应用程序,处理来自2个流的数据。我正在使用一个表API,我想从一个流1中消费数据,并查询另一个流2,并获得最新时间戳的记录-
我现在有这个了-
def insert_into_output(output_table_name, event_table_name, code_table_name):
return """
INSERT INTO {0} (ip, sn, code, timestamp)
SELECT DISTINCT
ip, sn, code, timestamp
FROM {2} WHERE
sn =
(SELECT
sn
FROM {1}
WHERE timestamp =
(SELECT MAX(timestamp) FROM {1}))
""".format(output_table_name, event_table_name, code_table_name)
不幸的是,我得到一个错误陈述-doesn't support consuming update and delete changes which is produced by node GroupAggregate(groupBy=[ip, sn, code, timestamp], select=[ip, sn, code, timestamp])
。什么好主意吗?
使用MAX(TIMESTAMP)
的SQL查询的结果意味着结果可以不断更改,因为现在的时间戳可能比5分钟前的时间戳高。因此,该SQL语句的结果是一个收回流。你可以在表到流转换文档
你正在发送到Kinesis,但它不支持收回流,只支持追加流