Apache Flink翻滚窗口延迟了结果



在使用翻滚窗口的apache flink应用程序中遇到问题。窗口大小是10秒,我希望每10秒有一个resultSet DataStream。然而,当最新窗口的resultSet总是被延迟时,除非我将进一步的数据推送到源流。

例如,如果我在"01:33:40.0"one_answers"01:34:00.0"之间将多条记录推送到源流,然后停下来查看日志,则不会发生任何事情。

我在"01:37:XX"再次推送了一些数据,然后将在"01:33:40.0"one_answers"01:34:00.0"之间获得窗口的resultSet,这是不期望的,因为下游接收器逻辑期望resultSet准时。

如有任何改进建议,我们将不胜感激。谢谢

下面是日志:

"log timestamp": "2019-11-15 01:37:45",
"message": "resultSet output: CLASS: 13 CNT: 1 from: 2019-11-15 01:33:40.0 to: 2019-11-15 01:34:00.0n",

以下是代码片段:

Table resultTable = tableEnv.sqlQuery(""+
"SELECT " +
"  CAST (N02_001 AS VARCHAR(10)) AS RAILWAY_CLASS, " +
"  COUNT(*) RAILWAY_CLASS_COUNT, " +
"  TUMBLE_START(rowtime, INTERVAL '20' SECOND) as WINDOW_START, " +
"  TUMBLE_END(rowtime, INTERVAL '20' SECOND) as WINDOW_END " +
" FROM Inputs " +
" GROUP BY TUMBLE(rowtime, INTERVAL '20' SECOND), CAST (N02_001 AS VARCHAR(10))");

TupleTypeInfo<Tuple4<String, Long, Timestamp, Timestamp>> tupleType = new TupleTypeInfo<>(
Types.STRING,
Types.LONG,
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP);
DataStream<Tuple4<String, Long, Timestamp, Timestamp>> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
resultSet
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
String output = "CLASS: " + value.f0 + " CNT: " + value.f1 + " from: " + value.f2 + " to: " + value.f3 + "n";
log.warn("resultSet output: " + output);
return value;
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP));

这是预期的行为,您使用的是EventTime,这意味着用于关闭窗口和跟踪应用程序中时间流的水印来自事件时间戳。这意味着,如果没有事件,就不会有时间流,因此现在将生成窗口。这就是你所观察到的。

您所经历的行为很可能来自于您使用的AssignerWithPunctuatedWatermark,它为每个事件发送时间戳和水印。如果切换到AssignerWithPeriodicWatermark,即使没有数据,也应该生成水印,并关闭&发射窗口。

这是因为您的水印保留在"01:33:40.0";不会触发正在进行的窗口的关闭操作。

然后,当您在"01:37:XX"发送事件时,这将向前推送水印,您将获得计算结果。

最新更新