Flink SQL CURRENT_TIMESTAMP始终返回相同的值



我在 Flink 1.8 中使用 Flink SQL API。我有两个流表表 1 和表 2。

如果我们将receivedTime定义为表中接收数据的时间,我想连接表1和表2(在某些id上(,并仅保留Table1.receivedTime > Table2.receivedTime的行。

首先,我尝试使用 Flink SQLCURRENT_TIMESTAMP来执行此操作:

NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT     : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2 
WHERE NEW_TABLE1.id = NEW_TABLE2.id 
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime

但看起来CURRENT_TIMESTAMP总是返回评估查询时的时间戳。(看起来此时CURRENT_TIMESTAMP已替换为当前日期,并且不是动态值(。我觉得这种行为很奇怪,正常吗?

我尝试的第二个解决方案是使用 Flink 的处理时间:

NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT     : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2 
WHERE NEW_TABLE1.id = NEW_TABLE2.id 
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime

但在本例中,处理时间似乎是在执行查询时计算的。然后,在我的 JOIN 查询中,两个处理时间始终相等。

做我想做的事的正确方法是什么?

Flink 和 Flink SQL 支持两种不同的时间概念:处理时间是处理事件的时间(或者换句话说,执行查询的时间(,而事件时间基于事件中记录的时间戳。文档的此处介绍了如何在表和 SQL API 中反映这种区别。

要获得所需的内容,首先需要安排在两个表中创建数据的任何过程,以在每条记录中包含事件时间时间戳。然后你需要配置你的表,以便 Flink SQL 知道每个表中的哪个字段将用作 rowtime 属性,并且你还需要指定如何进行水印。

例如,如果您使用的是 SQL 客户端,则您的架构可能如下所示,以指示 rideTime 字段应用作事件时间时间戳以及使用 60 秒延迟的定期有界乱序水印策略:

schema:
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"

如果您没有使用 SQL 客户端,请参阅文档以获取示例,无论是使用数据流到表的转换还是表源。

更新:

我认为,你真正喜欢的是使用摄取时间,但 Flink SQL 不支持摄取时间。您必须将作业配置为使用TimeCharacteristic.EventTime,实现时间戳提取器和水印生成器,并调用assignTimestampsAndWatermarks

如果您不想为每个事件中的时间戳字段而烦恼,时间戳提取器可能如下所示:

AssignerWithPeriodicWatermarks<Event> assigner = new AscendingTimestampExtractor<Event> {
@Override
public long extractAscendingTimestamp(Event element) {
return System.currentTimeMillis();
}
};

相关内容

  • 没有找到相关文章

最新更新