我有一个FlinkKafkaConsumer定义如下FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)
,我正在使用setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
来处理事件时间。
现在我想用函数assignTimestampsAndWatermarks
分配一个周期性水印,但我不知道我应该传递给该函数什么,因为在文档中,该函数的示例接收带有getCreationTime()
的MyType
类型的元素,我的使用者是 String 类型。
在这种情况下是否可以分配事件时间?
编辑:我想用作事件时间的时间是每个寄存器存储在Kafka中的时间。
EventTime
的概念至少在定义中与创建事件而不是接收事件的时间严格相关。因此,如果您从 Kafka 使用的事件具有某种时间戳(例如,如果您将 JSON 作为字符串使用然后解析它(,则可以在assignTimestampsAndWatermarks
函数中使用此时间戳。
如果您正在解析普通String
对象,那么您可以做的最好的事情是使用自定义KafkaDeserializationSchema
提取每个事件的 Kafka 时间戳并使用它。
从技术上讲,您甚至可以使用人为增加每条记录时间戳的计数器(例如,将其递增 1(,但这在EventTime
处理方面似乎没有意义。