Flink临时连接只工作几秒钟



我试图在Flink中实现一个事件时间临时连接。下面是第一个连接表:

tEnv.executeSql("CREATE TABLE AggregatedTrafficData_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3)," +
"`area` STRING," +
"`networkEdge` STRING," +
"`vehiclesNumber` BIGINT," +
"`averageSpeed` INTEGER," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.trafficdata.aggregated'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'json'," +
"'json.timestamp-format.standard' = 'ISO-8601'" +
")");

表用作以下查询的汇聚:

Table aggregatedTrafficData = trafficData
.window(Slide.over(lit(30).seconds())
.every(lit(15).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("w"), $("networkEdge"), $("area"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("networkEdge"),
$("plate").count().as("vehiclesNumber"),
$("speed").avg().as("averageSpeed")
);

这是另一个连接表。我使用Debezium将Postgres表流到Kafka:

tEnv.executeSql("CREATE TABLE TransportNetworkEdge_Kafka (" +
"`timestamp` TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp' VIRTUAL," +
"`urn` STRING," +
"`flow_rate` INTEGER," +
"PRIMARY KEY(`urn`) NOT ENFORCED," +
"WATERMARK FOR `timestamp` AS `timestamp`" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.network.transport_network_edge'," +
"'scan.startup.mode' = 'latest-offset'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'traffic-data-aggregation-job'," +
"'format' = 'debezium-json'," +
"'debezium-json.schema-include' = 'true'" +
")");

最后是时间连接:

Table transportNetworkCongestion = tEnv.sqlQuery("SELECT AggregatedTrafficData_Kafka.`timestamp`, `networkEdge`, " +
"congestion(`vehiclesNumber`, `flow_rate`) AS `congestion` FROM AggregatedTrafficData_Kafka " +
"JOIN TransportNetworkEdge_Kafka FOR SYSTEM_TIME AS OF AggregatedTrafficData_Kafka.`timestamp` " +
"ON AggregatedTrafficData_Kafka.`networkEdge` = TransportNetworkEdge_Kafka.`urn`");

我遇到的问题是,连接工作只在最初的几秒钟(在Postgres表更新后),但我需要不断地与debezium表连接第一个表。我做错了什么吗?谢谢euks

使用AS OF语法的时态连接需要:

  • 具有有效事件时间属性的仅追加表
  • 一个带有主键和有效事件时间属性的更新表
  • 主键
  • 上的相等谓词

当Flink SQL的时间操作符应用于事件时间流时,水印在决定何时产生结果以及何时清除状态方面起着关键作用。

执行临时连接时:

  • 来自仅追加表的行被缓冲为Flink状态,直到连接操作符的当前水印达到它们的时间戳
  • 对于版本表,对于每个键,时间戳在连接操作符当前水印之前的最新版本将保持状态,加上当前水印
  • 之后的任何版本。
  • 每当连接算子的水印前进时,产生新的结果,并且不再相关的状态被清除

连接操作符跟踪它从其输入通道接收到的水印,其当前水印始终是这两个水印中的最小值。这就是为什么你的join会停滞,只有当flow_rate更新时才会取得进展。

解决这个问题的一种方法是设置transportnetworkkedge_kafka表的水印,如下所示:

WATERMARK FOR timestamp as CAST('9999-12-31 00:00:00.000' as TIMESTAMP(3))

这将把这个表/流的水印设置为一个非常大的值,这将使这个流的水印无关——这个流的水印永远不会是最小的。

然而,这样做的缺点是使连接结果不确定。

最新更新