当我使用Flink SQL执行以下语句时,错误报告如下:
请求
根据user_id
字段将user_behavior_kafka_table中的数据分组,然后取出每组中ts
字段值最大的数据
执行sql
SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a
WHERE ts = (select max(b.ts)
FROM user_behavior_kafka_table AS b
WHERE a.user_id = b.user_id );
闪烁版本
1.11.2
错误消息
AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
作业部署
关于纱线
表格消息
- user_behavior_kafka_table来自消费者kafka主题的数据
{"user_id":"aaa","item_id";:"11-222-333","comment":"在"处的aaa访问项,"ts":100}
{"user_id":"ccc","item_id";:"11-222-334","comment":"cc访问项在","ts":200}
{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":300}
{"user_id":"bbb","item_id";:"11-222-334","comment":"bbs访问项在","ts":200}
{"user_id":"aaa","item_id";:"11-222-333","comment":"在"处的aaa访问项,"ts":200}
{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":400}
{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":400}
{"user_id":"vvv","item_id";:"11-222-334","comment":"在"处的vvv访问项,"ts":200}
{"user_id":"bbb","item_id","11-222-333","comment":"bbs访问项在","ts":300}
{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":300}
{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":100}
{"user_id":"bbb","item_id";:"11-222-334","comment":"bbs访问项在","ts":100}
- user_behavior_hive_table预期结果
{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":400}
{"user_id":"bbb","item_id","11-222-333","comment":"bbs访问项在","ts":300}
{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":400}
{"user_id":"vvv","item_id";:"11-222-334","comment":"在"处的vvv访问项,"ts":200}
要从该查询中获得期望的结果,需要以批处理模式执行该查询。作为一个流式查询,Flink SQL计划器无法处理它,如果可以,它将生成一个结果流,其中每个user_id
的最后一个结果将与预期结果匹配,但还会有额外的中间结果。
例如,对于用户aaa,会出现以下结果:
aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400
但是ts=300的行将被跳过,因为它从来不是ts的最大值的行。
如果你想让它在流媒体模式下工作,试着将其重新表述为top-n查询:
SELECT user_id, item_id, ts FROM
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
FROM user_behavior_kafka_table)
WHERE row_num = 1;
我相信这应该有效,但我不能轻易地测试它