事件时间窗口未触发



我正在尝试使用EventTime设置流媒体解决方案。我将从卡夫卡消费。我的数据最多可以延迟一分钟(所以我的水标记延迟一分钟(并且顺序不正常。 我有 30 秒的窗口。 我有以下设置:

KafkaConsumer.scala

object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode](getKafkaTopic, new JsonNodeDeserializationSchema, properties)
consumer.setStartFromGroupOffsets()
val stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new WMAssigner)
stream
.keyBy { jsonEvent =>
val key = jsonEvent.findValue("key").toString replaceAll(""","")
key.toString
}
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.process { new SessionWindowProcessor }
.print
env.execute("EventTime Test")
}
}

WMAssigner.scala

class WMAssigner extends AssignerWithPeriodicWatermarks[ObjectNode] {
var currentMaxTimestamp: Long = 0
var currentWaterMark: Long = 0
override def extractTimestamp(element: ObjectNode, previousElementTimestamp: Long): Long = {
val lStr = element.findValue("ts").toString replaceAll(""", "")
currentMaxTimestamp = if(currentMaxTimestamp > lStr.toLong) currentMaxTimestamp else lStr.toLong
currentMaxTimestamp
}
override def getCurrentWatermark: Watermark = {
currentWaterMark = if(currentMaxTimestamp - 60000 < 0) 0 else currentMaxTimestamp - 60000
new Watermark(currentWaterMark)
}
}

SessionWindowProcessor.scala

class SessionWindowProcessor extends ProcessWindowFunction[ObjectNode,Long,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[Long]): Unit = {
println("Processing!")
var maxVal: Long = 0
elements.foreach( value => {
val valStr = value.findValue("value").toString replaceAll(""","")
maxVal = if(valStr.toLong > maxVal) valStr.toLong else maxVal
})
out.collect(maxVal)
}
}

示例数据:

"{"ts": 0,     "key": "a", "value": 0}",
"{"ts": 15000, "key": "a", "value": 1}",
"{"ts": 30000, "key": "a", "value": 2}",
"{"ts": 90001, "key": "a", "value": 3}" 

我希望在第四条消息出现后,我的第一个窗口会触发,值为12(我不确定现在包含是如何工作的(。 不幸的是,我什至没有看到SessionWindowProcessor.scala射击中的println。 我在这里做错了什么导致我的窗口失败吗?

对于它的价值,你的extractTimestamp()方法应该返回 lStr.toLong,而不是 currentMaxTimestamp。这种方法跟踪当前MaxTimestamp,以便值可用于水印生成是有道理的,但extractTimestamp的作用是向Flink提供每个流元素的事件时间时间戳。

但是,我不相信这可以解释为什么您看不到任何输出 - 如果您的数据实际上是按时间戳排序的,则当然不会,如示例中所示。

另请注意,您可以使用 BoundedOutOfOrdernessTimestampExtractor,它使用起来更简单一些。

你的 Kafka 主题有多少个分区?如果它有多个分区,问题是 Flink 必须为每个分区查看超过90000的时间戳,以便发出值为30000的水印。因此,您要么必须添加更多数据,以便每个分区都有一个时间戳大于90000的元素,要么将 Kafka 主题的分区数设置为1。下面是有关水印和 Kafka 连接器的详细信息。

相关内容

  • 没有找到相关文章

最新更新