我使用Flink TableApi和sql来计算每个翻滚窗口中的不同用户,但结果只在早于水印的时段可见。有没有可能;部分";这段时间的结果还能改变吗?
我的代码(在Flink SQL中运行(
CREATE TABLE KafkaTable
(
`user_id` BIGINT,
`event_ts` TIMESTAMP(3),
WATERMARK FOR `event_ts` AS `event_ts`
) WITH (
'connector' = 'kafka',
'topic' = 'quickstart-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'csv',
'key.fields' = 'user_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY',
'value.json.timestamp-format.standard' = 'ISO-8601'
);
SELECT
window_start,
count(distinct user_id),
count(user_id)
FROM
TABLE(
TUMBLE(TABLE KafkaTable, DESCRIPTOR(event_ts), INTERVAL '1' MINUTES)
)
GROUP BY window_start, window_end;
在kafka中,我有8个分区(0..7(。第N个分区包含事件,从现在起延迟了N分钟。(记录每10秒发送一次(
在结果中,我看到的最新窗口是8分钟前的窗口,包含所有分区的结果。screenshot_from_flink_sql
相反,我希望看到所有窗口,即使窗口的结果可能会发生变化——比如:
+I 2021-11-09 20:04:00.00 8 42
+I 2021-11-09 20:05:00.00 8 42
+I 2021-11-09 20:06:00.00 7 38
+I 2021-11-09 20:07:00.00 6 32
+I 2021-11-09 20:08:00.00 5 26
+I 2021-11-09 20:09:00.00 4 20
+I 2021-11-09 20:10:00.00 3 14
+I 2021-11-09 20:11:00.00 2 8
+I 2021-11-09 20:12:00.00 1 2
这是否可以通过SQL仅使用TableApi?
您可以使用CUMULATE而不是TUMBLE。这将为您提供一系列固定间隔的早期窗口点火。