我正在使用带有 scala 的 Flink 1.3.2 构建一个流媒体应用程序,我的 Flink 应用程序将监控一个文件夹并将新文件流式传输到管道中。文件中的每个记录都有一个关联的时间戳。我想使用此时间戳作为事件时间并使用AssignerWithPeriodicWatermarks[T]
构建水印,我的水印生成器如下所示:
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[Activity] {
val maxTimeLag = 6 * 3600000L // 6 hours
override def extractTimestamp(element: Activity, previousElementTimestamp: Long): Long = {
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssXXX")
val timestampString = element.getTimestamp
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(10000L)
val stream = env.readFile(inputformart, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)
val activity = stream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map { line =>
new tuple.Tuple2(line.id, line.count)
}.keyBy(0).addSink(...)
但是,由于我的文件夹那里有一些旧数据,我不想处理它们。并且旧文件中记录的时间戳> 6 小时,应该比水印早。但是,当我开始运行它时,我仍然可以看到创建了一些初始输出。我想知道水印的初始值是如何设置的,是在第一个间隔之前还是之后?可能是我误解了这里的某些东西,但需要一些建议。
您展示的管道中没有关心时间的运算符 - 没有窗口,没有ProcessFunction计时器 - 因此每个流元素都将畅通无阻地通过并进行处理。如果您的目标是跳过延迟的元素,则需要引入一些(以某种方式)将事件时间戳与当前水印进行比较的东西。
您可以通过在 keyBy 和 sink 之间引入一个步骤来做到这一点,如下所示:
...
.keyBy(0)
.process(new DropLateEvents())
.addSink(...)
public static class DropLateEvents extends ProcessFunction<...> {
@Override
public void processElement(... event, Context context, Collector<...> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
out.collect(event);
}
}
}
完成此操作后,您关于初始水印的问题变得相关。使用定期水印时,初始水印是Long.MIN_VALUE的,因此在发出第一个水印之前不会被视为延迟,这将在操作 10 秒后发生(考虑到您如何设置自动水印间隔)。
如果您想更详细地查看定期水印的生成方式,请在此处查看相关代码。
如果你想避免在前 10 秒内处理延迟元素,你可以简单地完全忘记使用事件时间和水印,只需修改上面显示的 processElement 方法,将事件时间戳与System.currentTimeMillis() - maxTimeLag
而不是当前水印进行比较。另一种解决方案是使用标点水印,并在第一个事件中发出水印。
或者更简单地说,您可以在flatMap或过滤器中检测并删除延迟事件,因为您定义的是相对于System.currentTimeMillis()而不是水印的延迟。