我正在尝试使用EventTime
设置流媒体解决方案。我将从卡夫卡消费。我的数据最多可以延迟一分钟(所以我的水标记延迟一分钟(并且顺序不正常。 我有 30 秒的窗口。 我有以下设置:
KafkaConsumer.scala
object KafkaConsumer {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = getServerProperties
val consumer = new FlinkKafkaConsumer010[ObjectNode](getKafkaTopic, new JsonNodeDeserializationSchema, properties)
consumer.setStartFromGroupOffsets()
val stream = env.addSource(consumer)
.assignTimestampsAndWatermarks(new WMAssigner)
stream
.keyBy { jsonEvent =>
val key = jsonEvent.findValue("key").toString replaceAll(""","")
key.toString
}
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.process { new SessionWindowProcessor }
.print
env.execute("EventTime Test")
}
}
WMAssigner.scala
class WMAssigner extends AssignerWithPeriodicWatermarks[ObjectNode] {
var currentMaxTimestamp: Long = 0
var currentWaterMark: Long = 0
override def extractTimestamp(element: ObjectNode, previousElementTimestamp: Long): Long = {
val lStr = element.findValue("ts").toString replaceAll(""", "")
currentMaxTimestamp = if(currentMaxTimestamp > lStr.toLong) currentMaxTimestamp else lStr.toLong
currentMaxTimestamp
}
override def getCurrentWatermark: Watermark = {
currentWaterMark = if(currentMaxTimestamp - 60000 < 0) 0 else currentMaxTimestamp - 60000
new Watermark(currentWaterMark)
}
}
SessionWindowProcessor.scala
class SessionWindowProcessor extends ProcessWindowFunction[ObjectNode,Long,String,TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[Long]): Unit = {
println("Processing!")
var maxVal: Long = 0
elements.foreach( value => {
val valStr = value.findValue("value").toString replaceAll(""","")
maxVal = if(valStr.toLong > maxVal) valStr.toLong else maxVal
})
out.collect(maxVal)
}
}
示例数据:
"{"ts": 0, "key": "a", "value": 0}",
"{"ts": 15000, "key": "a", "value": 1}",
"{"ts": 30000, "key": "a", "value": 2}",
"{"ts": 90001, "key": "a", "value": 3}"
我希望在第四条消息出现后,我的第一个窗口会触发,值为1
或2
(我不确定现在包含是如何工作的(。 不幸的是,我什至没有看到SessionWindowProcessor.scala
射击中的println
。 我在这里做错了什么导致我的窗口失败吗?
对于它的价值,你的extractTimestamp()
方法应该返回 lStr.toLong,而不是 currentMaxTimestamp。这种方法跟踪当前MaxTimestamp,以便值可用于水印生成是有道理的,但extractTimestamp的作用是向Flink提供每个流元素的事件时间时间戳。
但是,我不相信这可以解释为什么您看不到任何输出 - 如果您的数据实际上是按时间戳排序的,则当然不会,如示例中所示。
另请注意,您可以使用 BoundedOutOfOrdernessTimestampExtractor,它使用起来更简单一些。
你的 Kafka 主题有多少个分区?如果它有多个分区,问题是 Flink 必须为每个分区查看超过90000
的时间戳,以便发出值为30000
的水印。因此,您要么必须添加更多数据,以便每个分区都有一个时间戳大于90000
的元素,要么将 Kafka 主题的分区数设置为1
。下面是有关水印和 Kafka 连接器的详细信息。