我试图实现一个事件时间临时连接,但我没有看到任何数据从连接发出。我也没有看到任何运行时异常。
Flink Version: 1.13
Kafka主题现在只有一个分区
我是这样设置的:
有一个" add -only"数据流(左输入/探测侧),如下所示:
{
"eventType": String,
"eventTime": LocalDateTime,
"eventId": String
}
因此,在连接它们之前,我将这个数据流转换为一个表:
var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.watermark("eventTime", $("eventTime"))
.build());
然后,我有"版本表";(右输入/构建端)由Kafka (Debezium CDC changelog)支持,如下所示:
CREATE TABLE metadata (
id VARCHAR,
eventMetadata VARCHAR,
origin_ts TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR origin_ts AS origin_ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'SERVER_ADDR',
'properties.group.id' = 'SOME_GROUP',
'topic' = 'SOME_TOPIC',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json'
)
连接查询如下所示:
SELECT e.eventId, e.eventTime, e.eventType, m.eventMetadata
FROM events_view AS e
JOIN metadata_view FOR SYSTEM_TIME AS OF e.eventTime AS m
ON e.eventId = m.id
根据这里的其他帖子,我设置了源空闲超时:
table.exec.source.idle-timeout -> 5
而且,我还尝试在水印上设置IdlenessTime,以确保源不会回发射水印。在这一点上,我可以看到水印正在生成,但我仍然没有得到任何结果。
所有内容最终都保存在时态Join表中。所以,这里的问题是处理时间临时连接的语法。下面是解决这个问题的方法:
// register the metadata table as a temporal table func by specifying its watermark and primary-key attributes
var metadataHistory = tableEnv.from("metadata")
.createTemporalTableFunction($("proc_time"), $("id"));
tableEnv.createTemporarySystemFunction("metadata_view", metadataHistory);
// sql processing time temporal join
var temporalJoinResult = tableEnv.sqlQuery("SELECT" +
" e.eventId, e.eventType, e.eventTime, m.eventMetadata" +
" FROM events_view AS e," +
" LATERAL TABLE (metadata_view(t.procTime)) AS m" +
" WHERE e.eventId = m.id");
这里,元数据上的proc_time需要在表DDL中声明,如下所示:
CREATE TABLE metadata (
id VARCHAR,
eventMetadata VARCHAR,
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'SERVER_ADDR',
'properties.group.id' = 'SOME_GROUP',
'topic' = 'SOME_TOPIC',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'debezium-json'
)
,在将数据流转换为表时,也为该表分配procTime,如下所示,
var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.columnByExpression("procTime", "PROCTIME()")
.build());