我有下面的SQL查询,我在flink作业中使用。mysql_table
是使用JDBC连接器创建的,kafa_source
表是从传入kafka流创建的。
SELECT T.event_id, T.event_name, TUMBLE_END(T.event_time, INTERVAL '5' MINUTE) AS event_time,
MAX(T.event_value) AS max_event_value FROM (
SELECT d.event_id, d.event_name, d.event_source_id, d.event_key, s.event_value, s.event_time
FROM kafka_source s JOIN mysql_table FOR SYSTEM_TIME AS OF s.proc_time AS d
ON d.event_id = s.event_id and d.event_name = s.event_name) T
GROUP BY T.event_id, T.event_key, TUMBLE(T.event_time, INTERVAL '5' MINUTE)
我正在执行两者之间的时间连接,当我在Flink的sql-client CLI(用flink-faker
测试)中检查时,这运行良好。内部查询工作得很好,正在打印结果。谁能帮我找出问题的查询?
编辑:我正在寻找像这样在5分钟内创建的TUMBLE事件的输出
+I ("11", "SPILL_OVER", 2022-04-28T00:30:00", 28.0)
+I ("11", "SPILL_OVER", 2022-04-28T00:35:00", 32.4)
+I ("11", "SPILL_OVER", 2022-04-28T00:40:00", 19.6)
+I ("11", "SPILL_OVER", 2022-04-28T00:45:00", 22.3)
mysql表的schema是
+-----------------+--------------+
| Field | Type |
+-----------------+--------------+
| event_id | varchar(64) |
| event_source_id | varchar(255) |
| event_name | varchar(255) |
和kafka表的schema是
event_id STRING
event_name STRING
event_time TIMESTAMP(9)
event_value DOUBLE
编辑:我观察到TUMBLE_END
与PROCTIME()
列一起工作很好,但与event_time
不一样。下面是分别选择proc_time
和event_time
的查询的输出。
+I[2022-05-09T14:36:21.078Z, 2022-05-09T14:36:14.163Z]
+I[2022-05-09T14:36:21.079Z, 2022-05-09T14:36:19.170Z]
下面的查询工作在kafka_source
表上。
SELECT event_id, event_name, MAX(event_value), TUMBLE_END(proc_time, INTERVAL '2' MINUTE)
FROM kafka_source
GROUP BY event_id, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE)
并给出如下类似的输出
+I[[11, SPILL_OVER, 2022-05-09T20:10, 0.93]
+I[[12, SPILL_OVER, 2022-05-09T20:10, 0.9]
+I[[11, PRAXY, 2022-05-09T20:12, 0.91]
当我使用event_time
代替proc_time
时,相同的查询不返回任何结果。我在表中创建这些列,如下所示:
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTime, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
其中eventTime
是来自kafka主题的传入时间戳值。两个字段的类型相同,TIMESTAMP_LTZ(3)
proc_time TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,
event_time TIMESTAMP_LTZ(3) *ROWTIME*
我在这里犯了什么错误?
我通过在表环境中添加一个设置来解决这个问题。
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")
我可以在设置超时时间后立即创建窗口