我正在使用Flink SQL来计算基于事件时间的窗口分析。一切都很好,直到我的数据源每天晚上都变为空闲,之后直到第二天数据开始再次流动时才产生最后一分钟的结果。
CREATE TABLE input
id STRING,
data BIGINT,
rowtime TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
WITH (
'connector' = 'kafka',
'topic' = 'input',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
SELECT ...
FROM
(SELECT * FROM
TABLE(TUMBLE(TABLE input, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES)))
GROUP BY ..., window_start, window_end
我试过设置table.exec.source.idle-timeout
,但没有帮助。我能做什么?
table.exec.source.idle-timeout
(以及与WatermarkStrategy的DataStream API一起使用的相应withIdleness
构造(检测空闲输入分区,并防止它们阻碍整个水印的进程。然而,为了使整个水印向前发展,必须在某个地方仍然有一些输入。
一些选项:
(1( 接受这个问题,这意味着在观察输入流中较大的时间戳的基础上,等待水印能够正常前进。正如您所指出的,在您的用例中,这可能需要等待几个小时。
(2( 安排输入流以包含保持活动的消息。这样,水印生成器将有证据(基于保活消息中的时间戳(表明它可以推进水印。您必须修改查询以忽略这些无关的事件。
(3( 当作业已完全接受所有日常输入,但尚未生成最终结果集时,停止作业并指定--drain
。这将通过管道发送一个值为MAX_watermark的水印,这将关闭所有挂起的窗口。然后可以重新启动作业。
(4( 实现了一种自定义水印策略,该策略使用处理时间计时器来检测空闲状态,并根据挂钟时间的流逝来人为地提前水印。这将需要将表输入转换为DataStream,在那里添加水印,然后转换回用于窗口化的表。看见https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/例如这些转换。