我有一个来自Kafka的数据流,我想用存储在Hadoop中的Parquet文件中的静态数据来丰富它,并最终写入Filesystem sink。
最初我尝试了一个查找连接,如下所示,
SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1
LEFT JOIN lookupTable FOR SYSTEM_TIME AS OF t1.proctime AS t2
ON t1.lookup_type = t2.lookup_type
,但得到以下错误
org.apache.flink.table.api.TableException: Processing-time temporal join is not supported yet.
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.createJoinOperator(StreamExecTemporalJoin.java:292)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.getJoinOperator(StreamExecTemporalJoin.java:254)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.java:179)
接下来,我尝试实现时态表函数LEFT JOINhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#left-outer-join如下所示,
TableDescriptor lookupDescriptor = TableDescriptor.forConnector("filesystem").format(FormatDescriptor.forFormat("parquet").build())
.option("path",lookupFileLocation)
.schema(Schema.newBuilder()
.column("lookup_type", DataTypes.STRING().notNull())
.column("enrichment_data_col",DataTypes.INT())
.columnByExpression("proc_time","PROCTIME()")
.primaryKey("lookup_type")
.build())
.build();
tenv.createTable("lookupTable",lookupDescriptor);
TemporalTableFunction tmpLookup = tenv.from("lookupTable").createTemporalTableFunction($("proc_time"),$("lookup_type"));
tenv.createTemporarySystemFunction("lookupTableFunc",tmpLookup);
SELECT t1.*,t2.enrichment_data_col from source_stream_table AS t1
LEFT OUTER JOIN LATERAL TABLE(lookupTableFunc(t1.proctime)) AS t2
ON TRUE
但是现在得到下面的错误,
org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [I@6f8667bb in [Temporal Table Function]
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionPrimaryKey(TemporalJoinUtil.scala:383)
at org.apache.flink.table.planner.plan.utils.TemporalJoinUtil$.validateTemporalFunctionCondition(TemporalJoinUtil.scala:365)
时间表函数内部连接语法工作得很好,但我正在寻找一个左连接。
我尝试了Flink版本1.15.1
和1.16.0
在Flink中是否有其他方法来实现时间左连接,或者我在这里错过了一些东西。
您的第一次尝试确实不支持:
目前,暂不支持与最新版本的视图/表进行时态连接时使用的FOR SYSTEM_TIME AS OF语法
基本上,不鼓励处理时间,因为它不是确定的。您可以尝试创建一个事件时间:
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
EDIT: remove second part (false information)