我为用户提供了一个 Flink SQL 接口,所以我不能真正使用 Table 或 Java/Scala 接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别的 API 指令。
一个用户如何转换,说:
SELECT b, AVG(a) "average" FROM source_data GROUP BY b
name: average_source_data_retracting
b STRING
average NUMERIC
-这是将值缩回-到将附加它们的形式中。此追加表单可以具有以下架构:
name: average_source_data_appending
flag BOOLEAN <-- indicating an accumulate or retract message
b STRING
average NUMERIC
Aka 有点像 RetractStreamTableSink 相当于 AppendStreamTableSink,但没有它是一个接收器。
所有这些都是为了使使用 average_source_data_appending 创建临时表(过滤收回消息(成为可能,但这种表只接受仅追加源表。
我已经考虑过使用窗口(如此处所述(,但我希望对临时表的更新是即时的。
请忽略这个问题,显然时态表函数可以接受(对我来说(收回的表。
大意如下:
SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b
可以接受作为时态表函数,其中 b 作为键,max_proctime 作为时间属性。我想 MAX(proctime( 以某种方式让它认为新行被发出,当它们只是被覆盖时?我想我需要更多的时间来理解这一点。
编辑:
通过挖掘源代码,我们发现时态表函数似乎接受收回定义,但前提是它在处理时间内:
TemporalProcessTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
rightState.update(element.getValue());
registerProcessingCleanupTimer();
} else {
rightState.clear();
cleanupLastTimer();
}
}
TemporalRowTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
...
checkNotRetraction(row);
...
}
private void checkNotRetraction(BaseRow row) {
if (BaseRowUtil.isRetractMsg(row)) {
String className = getClass().getSimpleName();
throw new IllegalStateException(
"Retractions are not supported by " + className +
". If this can happen it should be validated during planning!");
}
}
这是没有记录的;我不知道这是否是永久性的,以及文档是否会更新。