如何在Flink表API中与滑动窗口函数一起连接两个数据流?



我有两个来自两个Kafka主题的流表,我想连接这些流并对连接的数据执行聚合功能。流需要使用滑动窗口来连接。在加入和窗口的数据,我得到一个错误Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

下面是代码片段
select cep.payload['id'] , ep.payload['id'] ,
ep.event_flink_time,
ep.rowtime,
TIMESTAMPDIFF(SECOND, ep.event_flink_time, cep.event_flink_time) as timediff,
HOP_START (cep.event_flink_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) as hop_start,
HOP_END (cep.event_flink_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) as hop_end
FROM table1 cep
JOIN table2 ep
ON cep.payload['id'] = ep.payload['id']
group by HOP(cep.event_flink_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES), cep.payload, ep.payload, cep.event_flink_time, ep.event_flink_time,
ep.rowtime

我使用AWS齐柏林笔记本和使用Flink SQL表API。对于流数据,如何使用滑动窗口函数连接数据?或者我应该为流数据和窗口函数使用不同类型的连接吗?下面是同样错误的罚单:https://issues.apache.org/jira/browse/FLINK-10211

从你的执行sql,我建议你把这个任务分成两个部分。一种是"左关节"通过两个流数据源,然后执行"分组"遵循创建视图。除此之外,还要确认事件时间属性的类型是否正确。

流式SQL依赖于不再需要产生结果的过期状态的时间属性。这在特定的时间查询上下文中是有意义的,因为输入和输出记录上的时间戳都在向前移动——这发生在诸如窗口和间隔连接之类的查询中。

常规连接(没有任何时间约束的连接)不会以这种方式工作。任何先前摄取的记录都可能在任何时间点更新,然后需要更新相应的输出记录。这意味着两个输入流必须在Flink状态下完全具体化,并且输出流没有下游操作可以利用的时间顺序来进行状态保留优化。

考虑到这一切是如何工作的,Flink的流SQL规划器不能处理在常规连接之后有一个窗口——常规连接不能产生时间属性,而HOP坚持要有它们。

一个可能的解决方案是将连接重新表述为间隔连接,如果这将满足您的需要。

相关内容

  • 没有找到相关文章