如何使用Flink时态表



Flink中的新时态表看起来很棒,但我还没能让它们发挥作用。由于我找不到任何可行的例子,我想知道是否有其他人能做到这一点,并能指出我做错了什么。

这里有一点上下文:

查询:

SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id

设置:

// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);
// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);

我没有收到任何争吵和错误。我曾尝试通过更改注册为temporal的表来翻转查询,但没有成功。我还看了"ts"栏,得到了让我相信我应该至少排几行的日期。

感谢您的帮助。

附言:我在kafka的历史数据上运行这个,在"id"上分区,这也是行密钥

您可以在这里以测试的形式找到完全可用的代码"示例"(这两个测试的内容(处理时间和事件时间)在这里和这里或这里的文档中或多或少重复)。您可以从这些示例开始,然后逐步将它们转换为您的确切用例/场景。首先从预定义的数据集开始,然后再从Kafka中进行读取,这可能是有益的。

关于你的问题,从你的代码片段中还不清楚出了什么问题,一些潜在的问题:

  • 水印未分配/未增加(链接的testEventTimeInnerJoin()中的assignTimestampsAndWatermarks()调用)。TemporalJoin操作符只在水印上发出数据
  • 您尝试加入的那两个表之间的行时间不同步。如果site没有足够旧的行可以与sitemembership记录连接,则结果将为空。例如,如果来自site的所有记录都具有来自年份2019的时间字段,而sitemembership仅具有来自2018的记录

相关内容

  • 没有找到相关文章

最新更新