Flink:将收回的 SQL 转换为追加 SQL,仅使用 SQL,以馈送临时表



我为用户提供了一个 Flink SQL 接口,所以我不能真正使用 Table 或 Java/Scala 接口。一切都需要在 SQL 中指定。不过,我可以解析 SQL 文件中的注释,并添加指定的临时较低级别的 API 指令。

一个用户如何转换,说:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b
name: average_source_data_retracting
b STRING
average NUMERIC

-这是将值缩回-到将附加它们的形式中。此追加表单可以具有以下架构:

name: average_source_data_appending
flag BOOLEAN <-- indicating an accumulate or retract message
b STRING
average NUMERIC

Aka 有点像 RetractStreamTableSink 相当于 AppendStreamTableSink,但没有它是一个接收器。

所有这些都是为了使使用 average_source_data_appending 创建临时表(过滤收回消息(成为可能,但这种表只接受仅追加源表。

我已经考虑过使用窗口(如此处所述(,但我希望对临时表的更新是即时的。

请忽略这个问题,显然时态表函数可以接受(对我来说(收回的表。

大意如下:

SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b

可以接受作为时态表函数,其中 b 作为键,max_proctime 作为时间属性。我想 MAX(proctime( 以某种方式让它认为新行被发出,当它们只是被覆盖时?我想我需要更多的时间来理解这一点。

编辑:

通过挖掘源代码,我们发现时态表函数似乎接受收回定义,但前提是它在处理时间内:

TemporalProcessTimeJoinOperator.java:

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
rightState.update(element.getValue());
registerProcessingCleanupTimer();
} else {
rightState.clear();
cleanupLastTimer();
}
}

TemporalRowTimeJoinOperator.java:

@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
...
checkNotRetraction(row);
...
}
private void checkNotRetraction(BaseRow row) {
if (BaseRowUtil.isRetractMsg(row)) {
String className = getClass().getSimpleName();
throw new IllegalStateException(
"Retractions are not supported by " + className +
". If this can happen it should be validated during planning!");
}
}

这是没有记录的;我不知道这是否是永久性的,以及文档是否会更新。

相关内容

  • 没有找到相关文章

最新更新