实现Flink SQL处理时间临时左连接错误



我有一个来自Kafka的数据流,我想用存储在Hadoop中的Parquet文件中的静态数据来丰富它,并最终写入Filesystem sink。

最初我尝试了一个查找连接,如下所示,

SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1 
LEFT JOIN lookupTable FOR SYSTEM_TIME AS OF t1.proctime AS t2 
ON t1.lookup_type = t2.lookup_type

,但得到以下错误

org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.createJoinOperator(StreamExecTemporalJoin.java:292)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.getJoinOperator(StreamExecTemporalJoin.java:254)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.java:179)

接下来,我尝试实现时态表函数LEFT JOINhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#left-outer-join如下所示,


TableDescriptor lookupDescriptor = TableDescriptor.forConnector("filesystem").format(FormatDescriptor.forFormat("parquet").build())
.option("path",lookupFileLocation)
.schema(Schema.newBuilder()
.column("lookup_type", DataTypes.STRING().notNull())
.column("enrichment_data_col",DataTypes.INT())
.columnByExpression("proc_time","PROCTIME()")
.primaryKey("lookup_type")
.build())
.build();
tenv.createTable("lookupTable",lookupDescriptor);

TemporalTableFunction tmpLookup = tenv.from("lookupTable").createTemporalTableFunction($("proc_time"),$("lookup_type"));
tenv.createTemporarySystemFunction("lookupTableFunc",tmpLookup);

SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1 
LEFT OUTER JOIN LATERAL TABLE(lookupTableFunc(t1.proctime)) AS t2 
ON TRUE

但是现在得到下面的错误,

org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [I@6f8667bb in [Temporal Table Function]
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionPrimaryKey(TemporalJoinUtil.scala:383)
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionCondition(TemporalJoinUtil.scala:365)

时间表函数内部连接语法工作得很好,但我正在寻找一个左连接。

我尝试了Flink版本1.15.11.16.0

在Flink中是否有其他方法来实现时间左连接,或者我在这里错过了一些东西。

您的第一次尝试确实不支持:

目前,暂不支持与最新版本的视图/表进行时态连接时使用的FOR SYSTEM_TIME AS OF语法

基本上,不鼓励处理时间,因为它不是确定的。您可以尝试创建一个事件时间:

order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND

EDIT: remove second part (false information)

相关内容

  • 没有找到相关文章

最新更新