Flink:为FlinkKafkaConsumer分配水印



我有一个FlinkKafkaConsumer定义如下FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties),我正在使用setStreamTimeCharacteristic(TimeCharacteristic.EventTime)来处理事件时间。

现在我想用函数assignTimestampsAndWatermarks分配一个周期性水印,但我不知道我应该传递给该函数什么,因为在文档中,该函数的示例接收带有getCreationTime()MyType类型的元素,我的使用者是 String 类型。

在这种情况下是否可以分配事件时间?

编辑:我想用作事件时间的时间是每个寄存器存储在Kafka中的时间。

EventTime的概念至少在定义中与创建事件而不是接收事件的时间严格相关。因此,如果您从 Kafka 使用的事件具有某种时间戳(例如,如果您将 JSON 作为字符串使用然后解析它(,则可以在assignTimestampsAndWatermarks函数中使用此时间戳。

如果您正在解析普通String对象,那么您可以做的最好的事情是使用自定义KafkaDeserializationSchema提取每个事件的 Kafka 时间戳并使用它。

从技术上讲,您甚至可以使用人为增加每条记录时间戳的计数器(例如,将其递增 1(,但这在EventTime处理方面似乎没有意义。

相关内容

  • 没有找到相关文章

最新更新