我正在尝试使用 Flink SQL 运行流式 top-n 查询,但无法获得 Flink 文档中概述的"优化版本"工作。设置如下:
我有一个 Kafka 主题,其中每条记录包含一个元组(GUID、达到分数、最大可能分数(。把它们想象成一个学生参加评估,元组代表他获得了多少分。
我想要得到的是五个 GUID 的列表,其中最高分数以百分比形式衡量(即按 SUM(reached_score(/SUM(最大可能分数(排序(。
我首先聚合分数并按 GUID 对它们进行分组:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);
Table aggregatedScores = tableEnv.sqlQuery(
"SELECT " +
" guid, " +
" SUM(reached_score) as reached_score, " +
" SUM(max_score) as max_score, " +
" SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
"FROM scores " +
"GROUP BY guid");
tableEnv.registerTable("agg_scores", aggregatedScores);
生成的表包含聚合分数的未排序列表。然后,我尝试将其输入到 Top-N 查询中,因为它在 Flink 文档中使用:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score, row_num " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
tableEnv.toRetractStream(topN, Row.class).print();
运行此查询将按预期运行,如果元素的顺序发生更改,则会导致多次更新。
// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
然后,我遵循了文档的建议,并从投影中删除了row_number:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
运行类似的数据集:
// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)
// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
我不明白的是:
- 为什么第一个条目(
63992935-9684-4285-8c2b-1fd57b51b48f
(被删除并再次添加/仍然被触摸 - 为什么先添加第二个条目(第二次(,然后删除。这不会导致它在技术上从数据流中删除吗?
两者显然都与排序更改的顺序有关,但这不是优化的top-n查询(在文档中进一步写(应该解决的问题吗?
我已经检查了这个问题,也可以在我的本地环境中重现。我也做了一些调查,原因是:
"我们没有针对某些场景进行此类优化,您的案例似乎是其中之一"。
但是,根据用户文档,我认为在您的场景中也包含此类优化是有效的请求。对我来说,这看起来像一个错误,我们声称进行了一些优化,但没有成功。
我创建了一个问题:https://issues.apache.org/jira/browse/FLINK-15497 跟踪此问题,希望我们可以在即将推出的 1.9.2 和 1.10.0 版本中修复它。
感谢您报告此情况。