我是Flink和流媒体的新手。我正在使用大小为5Sec的滑动窗口和1Sec的滑动窗口来计算消息数量(检查下面的代码(,但我需要帮助保存(或打印(延迟到达的丢弃消息,我曾尝试使用sideOutputLateData,但它对我的不起作用
val sensorData = stream
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
.map(x => (x.event_id, x.user_id, 1))
.keyBy(x => (x._1, x._2))
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("_3")
带sideOutputLateData:
val lateOutputTag = OutputTag[(Int, Int, Int)]("late")
val sensorData = stream
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
.map(x => (x.event_id, x.user_id, 1))
.keyBy(x => (x._1, x._2))
.timeWindow(Time.seconds(5), Time.seconds(1))
.sideOutputLateData(lateOutputTag)
.sum("_3")
sensorData
.getSideOutput(lateOutputTag)
.print()
一个可能的答案是实际上没有任何后期事件。如果您没有将时间特性设置为事件时间,则会出现这种情况。仅仅使用assignTimestampsAndWatermarks
是不够的,还需要使用
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
强制Flink使用事件时间窗口,或通过确定事件时间窗口为默认窗口
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
否则,timeWindow
实际上是在构建SlidingProcessingTimeWindow
,在这种情况下,什么都不会迟到。
另一种可能性是,你的水印并没有像你预期的那样运行,而是让所有的事件都准时。