我正在阅读《使用Apache Flink进行流处理》一书,书中指出:"从0.10.0版本开始,Kafka支持消息时间戳。当从0.10或更高版本读取时,如果应用程序在事件时间模式下运行,消费者将自动提取消息时间戳作为事件时间戳*。"那么在processElement
函数中,调用context.timestamp()
默认情况下会返回kafka消息时间戳?能否请您提供一个简单的示例,说明如何实现AssignerWithPeriodicWatermarks/AssignerWithScoutatedWatermarks,该水印基于消耗的kafka消息时间戳提取(并构建水印(。
如果我使用TimeCharacteristic.ProcessingTime
,ctx.timestamp((会返回处理时间吗?在这种情况下,它会类似于context.timerService().currentProcessingTime()
吗。
谢谢。
Flink Kafka消费者会为您处理这一问题,并将时间戳放在需要的位置。在Flink 1.11中,您可以简单地依赖它,尽管您仍然需要注意提供一个指定无序(或断言时间戳是有序的(的WatermarkStrategy:
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.assignTimestampsAndWatermarks(
WatermarkStrategy.
.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
在Flink的早期版本中,您必须提供一个时间戳分配器的实现,它看起来像这样:
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
此版本的extractTimestamp
方法将StreamRecord中存在的时间戳的当前值作为previousElementTimestamp
传递,在这种情况下,它将是Flink Kafka消费者放置的时间戳。
Flink 1.11文档
Flink 1.10文档
至于ctx.timestamp()
在使用TimeCharacteristic.ProcessingTime
时返回的内容,在这种情况下,此方法返回NULL。(从语义上讲,是的,时间戳似乎是当前处理时间,但它不是这样实现的。(