Flink:暂时连接不发出数据



我试图实现一个事件时间临时连接,但我没有看到任何数据从连接发出。我也没有看到任何运行时异常。

Flink Version: 1.13

Kafka主题现在只有一个分区

我是这样设置的:

有一个" add -only"数据流(左输入/探测侧),如下所示:

{
"eventType": String,
"eventTime": LocalDateTime,
"eventId": String
}

因此,在连接它们之前,我将这个数据流转换为一个表:

var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.watermark("eventTime", $("eventTime"))
.build());

然后,我有"版本表";(右输入/构建端)由Kafka (Debezium CDC changelog)支持,如下所示:

CREATE TABLE metadata (
id                  VARCHAR,
eventMetadata       VARCHAR,
origin_ts           TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR origin_ts AS origin_ts
) WITH (
'connector'                           = 'kafka',
'properties.bootstrap.servers'        = 'SERVER_ADDR',
'properties.group.id'                 = 'SOME_GROUP',
'topic'                               = 'SOME_TOPIC',
'scan.startup.mode'                   = 'latest-offset',
'value.format'                        = 'debezium-json'
)

连接查询如下所示:

SELECT e.eventId, e.eventTime, e.eventType, m.eventMetadata
FROM events_view AS e
JOIN metadata_view FOR SYSTEM_TIME AS OF e.eventTime AS m
ON e.eventId = m.id

根据这里的其他帖子,我设置了源空闲超时:

table.exec.source.idle-timeout -> 5

而且,我还尝试在水印上设置IdlenessTime,以确保源不会回发射水印。在这一点上,我可以看到水印正在生成,但我仍然没有得到任何结果。

所有内容最终都保存在时态Join表中。

所以,这里的问题是处理时间临时连接的语法。下面是解决这个问题的方法:

// register the metadata table as a temporal table func by specifying its watermark and primary-key attributes
var metadataHistory = tableEnv.from("metadata")
.createTemporalTableFunction($("proc_time"), $("id"));
tableEnv.createTemporarySystemFunction("metadata_view", metadataHistory);
// sql processing time temporal join
var temporalJoinResult = tableEnv.sqlQuery("SELECT" +
" e.eventId, e.eventType, e.eventTime, m.eventMetadata" +
" FROM events_view AS e," +
" LATERAL TABLE (metadata_view(t.procTime)) AS m" +
" WHERE e.eventId = m.id");

这里,元数据上的proc_time需要在表DDL中声明,如下所示:

CREATE TABLE metadata (
id VARCHAR,
eventMetadata VARCHAR,
proc_time as PROCTIME(),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'                           = 'kafka',
'properties.bootstrap.servers'        = 'SERVER_ADDR',
'properties.group.id'                 = 'SOME_GROUP',
'topic'                               = 'SOME_TOPIC',
'scan.startup.mode'                   = 'latest-offset',
'value.format'                        = 'debezium-json'
)

,在将数据流转换为表时,也为该表分配procTime,如下所示,

var eventTable = tableEnv.fromDataStream(eventStream, Schema.newBuilder()
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.TIMESTAMP(3))
.column("eventType", DataTypes.STRING())
.columnByExpression("procTime", "PROCTIME()")
.build());

最新更新