Apache Flink - 事件时间



我想在 Apache flink 中为我的事件创建一个事件时钟。我正在通过以下方式做

public class TimeStampAssigner implements AssignerWithPeriodicWatermarks<Tuple2<String, String>> {

private final long maxOutOfOrderness = 0; // 3.5 
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) {
currentMaxTimestamp = new  Date().getTime();
return currentMaxTimestamp;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);

}
}

请检查上面的代码并判断我是否正确。在事件时间和水印分配之后,我想处理流处理功能,在该函数中,我将为不同的键收集流数据 10 分钟。

不,这不是一个合适的实现。事件时间戳应该是确定性的(即可重现的(,并且应该基于事件流中的数据。相反,如果您要使用 Date((.getTime,那么您或多或少地使用了处理时间。

通常,在执行事件时间处理时,事件将具有时间戳字段,时间戳提取器将返回此字段的值。

您展示的实现将失去使用事件时间所带来的大部分好处,例如重新处理历史数据以重现历史结果的能力。

您的实现是实现对 Flink 系统的摄取时间,而不是事件时间。例如,如果您从 Kafka 消费,则previousElementTimestamp通常应指向向 Kafka 生成事件的时间(如果 Kafka 生产者没有说任何其他内容(,这将使您的流处理可重现。

如果你想在 Flink 中实现事件时间处理,你应该使用一些与你的元素相关的时间戳。它可以在元素本身内部(这对于时间序列有意义(或存储在 Kafka 中,并在previousElementTimestamp下可用。

关于maxOutOfOrderness,你可能还需要考虑Flink的侧输出功能,该功能可以在窗口创建后获取后期元素并更新Flink作业的输出。

如果您从 Kafka 消费并且想要一些简单的数据丢失事件时间处理实现,请使用 AscendingTimestampExtractor。 AscendingTimestampExtractor 存在一些潜在的问题,如果您的数据未在分区内排序,或者您在运算符之后而不是直接在 KafkaSource 之后应用此提取器,则可能会出现这些问题。 对于强大的工业用例,您应该在持久性日志存储中实现水印摄取,如 Google DataFlow 模型中所述。

最新更新