Flink - SQL Tumble在事件时间未返回任何结果时结束



我有一个Flink作业,它从kafka主题中消费,并尝试基于少数列(如eventId和eventName)创建窗口。Kafka topic使用eventTimestamp作为时间戳字段,时间戳以millis为单位

DataStreamSource kafkaStream = env.fromSource(
kafkaSource, //kafkaSource is the KafkaSource builder
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource");
// Doing some transformations to map to POJO class.
Table kafkaTable = tableEnv.fromDataStream(
kafkaSource,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
// eventTimestamp is in millis
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTimestamp, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build();

使用proc_time时,Tumble_End窗口查询返回行,但当我使用event_time时不返回任何东西。

SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)"
-- This query gives some results
SELECT TUMBLE_END(proc_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)"

我试图设置env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,但这是不赞成的,因为我使用1.14.4稳定版本。

我尝试添加自定义水印策略,以及,但没有工作。我无法识别这种行为。有人能帮帮忙吗?

David -这是我正在使用的代码。

main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("localhost:9092")
.setTopics("an-topic")
.setGroupId("testGroup")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()
val kafkaStream = env.fromSource(kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource")
val kafkaRowMapper = kafkaStream.map(RowMapper())
val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
).renameColumns(
`$`("f0").`as`("eventId"),
`$`("f1").`as`("eventName"),
`$`("f3").`as`("eventValue")
)
tableEnv.createTemporaryView("finalTable", finalTable)
val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " +
"LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
"GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)"
val resultTable = tableEnv.sqlQuery(sqlQuery)
tableEnv.toDataStream(resultTable).print()
env.execute("TestJob")
}
class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
override fun map(value: String): Tuple4<String, String, Long, Float> {
val lineArray = value.split(",")
return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat())
}
}

Kafka topic有这样的值

event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32

我在创建表环境后添加了下面的行,然后我可以使用event_time

创建窗口
tableEnv.config.configuration.setString("table.exec.source.idle-timeout", "5000 ms")

最新更新