AppendStreamTableSink 不支持使用由节点 Join(joinType=[InnerJoin] 生成的更新更改



当我使用Flink SQL执行以下语句时,错误报告如下:

请求

根据user_id字段将user_behavior_kafka_table中的数据分组,然后取出每组中ts字段值最大的数据

执行sql

SELECT user_id,item_id,ts FROM user_behavior_kafka_table AS a 
WHERE ts = (select max(b.ts) 
FROM user_behavior_kafka_table AS b 
WHERE a.user_id = b.user_id );

闪烁版本

1.11.2

错误消息

AppendStreamTableSink doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=[((user_id = user_id0) AND (ts = EXPR$0))], select=[user_id, item_id, ts, user_id0, EXPR$0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])

作业部署

关于纱线

表格消息

  • user_behavior_kafka_table来自消费者kafka主题的数据

{"user_id":"aaa","item_id";:"11-222-333","comment":"在"处的aaa访问项,"ts":100}

{"user_id":"ccc","item_id";:"11-222-334","comment":"cc访问项在","ts":200}

{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":300}

{"user_id":"bbb","item_id";:"11-222-334","comment":"bbs访问项在","ts":200}

{"user_id":"aaa","item_id";:"11-222-333","comment":"在"处的aaa访问项,"ts":200}

{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":400}

{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":400}

{"user_id":"vvv","item_id";:"11-222-334","comment":"在"处的vvv访问项,"ts":200}

{"user_id":"bbb","item_id","11-222-333","comment":"bbs访问项在","ts":300}

{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":300}

{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":100}

{"user_id":"bbb","item_id";:"11-222-334","comment":"bbs访问项在","ts":100}

  • user_behavior_hive_table预期结果

{"user_id":"aaa","item_id";:"11-222-334","comment":"在"处的aaa访问项,"ts":400}

{"user_id":"bbb","item_id","11-222-333","comment":"bbs访问项在","ts":300}

{"user_id":"ccc","item_id";:"11-222-333","comment":"cc访问项在","ts":400}

{"user_id":"vvv","item_id";:"11-222-334","comment":"在"处的vvv访问项,"ts":200}

要从该查询中获得期望的结果,需要以批处理模式执行该查询。作为一个流式查询,Flink SQL计划器无法处理它,如果可以,它将生成一个结果流,其中每个user_id的最后一个结果将与预期结果匹配,但还会有额外的中间结果。

例如,对于用户aaa,会出现以下结果:

aaa 11-222-333 100
aaa 11-222-333 200
aaa 11-222-334 400

但是ts=300的行将被跳过,因为它从来不是ts的最大值的行。

如果你想让它在流媒体模式下工作,试着将其重新表述为top-n查询:

SELECT user_id, item_id, ts FROM
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY ts DESC) AS row_num
FROM user_behavior_kafka_table)
WHERE row_num = 1;

我相信这应该有效,但我不能轻易地测试它

相关内容

  • 没有找到相关文章

最新更新