Flink中的新时态表看起来很棒,但我还没能让它们发挥作用。由于我找不到任何可行的例子,我想知道是否有其他人能做到这一点,并能指出我做错了什么。
这里有一点上下文:
查询:
SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id
设置:
// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);
// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);
我没有收到任何争吵和错误。我曾尝试通过更改注册为temporal的表来翻转查询,但没有成功。我还看了"ts"栏,得到了让我相信我应该至少排几行的日期。
感谢您的帮助。
附言:我在kafka的历史数据上运行这个,在"id"上分区,这也是行密钥
您可以在这里以测试的形式找到完全可用的代码"示例"(这两个测试的内容(处理时间和事件时间)在这里和这里或这里的文档中或多或少重复)。您可以从这些示例开始,然后逐步将它们转换为您的确切用例/场景。首先从预定义的数据集开始,然后再从Kafka中进行读取,这可能是有益的。
关于你的问题,从你的代码片段中还不清楚出了什么问题,一些潜在的问题:
- 水印未分配/未增加(链接的
testEventTimeInnerJoin()
中的assignTimestampsAndWatermarks()
调用)。TemporalJoin操作符只在水印上发出数据 - 您尝试加入的那两个表之间的行时间不同步。如果
site
没有足够旧的行可以与sitemembership
记录连接,则结果将为空。例如,如果来自site
的所有记录都具有来自年份2019
的时间字段,而sitemembership
仅具有来自2018
的记录