For Flik Kafka SQL源定义:
CREATE TABLE PlayEvents (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`the_kafka_key` STRING,
`song_id` BIGINT NOT NULL,
`duration` BIGINT,
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'play-events',
'properties.bootstrap.servers' = 'localhost:29092',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8081',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset'
)
每个100ms
向play-events
发送测试PlayEvent
消息使用Avro格式:
protocol `protocol` {
record PlayEvent {
long song_id;
long duration;
}
}
我正在运行一个连续的Window Top-N查询:
Configuration strConf = new Configuration();
strConf.setInteger(RestOptions.PORT, 8089);
strConf.setString(RestOptions.BIND_PORT, "8088-8090");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(strConf);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration tableConfiguration = tableEnv.getConfig().getConfiguration();
tableConfiguration.setString("table.exec.source.idle-timeout", "2 min");
tableEnv.sqlQuery("" +
"SELECT window_end, song_id, play_count FROM ( " +
" SELECT *, " +
" ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY play_count DESC) AS row_num " +
" FROM (" +
" SELECT window_start, window_end, song_id, COUNT(*) AS play_count " +
" FROM TABLE( " +
" TUMBLE(TABLE PlayEvents, DESCRIPTOR(event_time), INTERVAL '60' SECONDS)) " +
" GROUP BY window_start, window_end, song_id " +
" ) " +
") WHERE row_num <= 4 "
).execute().print();
启动后的第一个打印输出显示出一致的结果,每个窗口期正好有4个结果:
...
| +I | 2021-11-04 12:23:00.000 | 3 | 16 |
| +I | 2021-11-04 12:23:00.000 | 6 | 14 |
| +I | 2021-11-04 12:23:00.000 | 1 | 12 |
| +I | 2021-11-04 12:23:00.000 | 11 | 10 |
| +I | 2021-11-04 12:24:00.000 | 9 | 16 |
| +I | 2021-11-04 12:24:00.000 | 6 | 13 |
| +I | 2021-11-04 12:24:00.000 | 7 | 12 |
| +I | 2021-11-04 12:24:00.000 | 5 | 12 |
后续输出显示了一些部分窗口内容:
...
| +I | 2021-11-04 12:25:00.000 | 9 | 12 |
| +I | 2021-11-04 12:25:00.000 | 6 | 6 |
| +I | 2021-11-04 12:27:00.000 | 11 | 18 |
...
例如,窗口每个间隔仅包含1、2、3个元素,并且有时整个间隔被丢弃。
如果我重新启动查询,所有结果将在第一次打印时正确显示(例如,每个窗口正好有4个元素(,然后它将继续显示部分响应。
我尝试了不同的配置,尝试将连续查询写入另一个Kafka主题,然后从那里进行查询。但不一致的部分结果仍然存在。
不确定我是否错过了一些重要的配置,或者这是Flink的已知限制或问题?
附言:我用Flink 1.14在本地机器上运行测试。
当进行事件时间处理的Flink作业表现出不确定性行为时,这是因为处理时间在某种程度上会影响结果。在这种情况下,我的猜测是,实际的无序度超过了水印中配置的1秒间隔,导致了随后被丢弃的延迟事件。
idle-timeout
也可以在这方面发挥作用。当标记为空闲的源恢复传递事件时,很容易出现水印已经移动过来自该先前空闲源的初始事件中的时间戳的情况。即使一秒钟足以处理通常预期的无序,在这种特殊情况下,它也可能不够长。
这个问题似乎与defaultLocalParallelism有关,它等于机器核心的数量(在我的例子中是16
(。结果生成了16个子任务but only one is used
来处理输入数据(输入Kafka主题仅使用1个分区(。
如果我将parallelism
明确设置为1
,则空闲超时和查询结果的不一致性将得到解决:
Configuration strConf = ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, strConf);
或者你可以这样设置:
env.getConfig().setParallelism(1);
附言:虽然这解决了我的演示问题,但我想知道为本地环境配置并行度/最大并行度的正确方法是什么。