Apache flink - 提前触发窗口实现问题 - 收到重复元素



我很难理解 flink 窗口原理,如果您能指出我正确的方向,我将非常高兴。

我的目的是计算时间间隔内的重复事件数,并在重复事件数大于阈值时生成警报事件。

据我了解,窗口是这种情况的完美匹配。

其他要求是在窗口中的重复事件计数为 2 时生成早期警报(即应在不等待窗口结束的情况下生成警报)。

我认为警报事件生成过程窗口函数可用于聚合窗口事件,并且可以使用自定义触发器根据重复事件计数(在水印到达窗口的结束时间戳之前)从窗口发出早期结果。

我正在使用事件时间语义,并且对自定义触发器有问题/疑问。

您可以在以下要点中找到实际的实现: https://gist.github.com/simpleusr/7c56d4384f6fc9f0a61860a680bb5f36

我正在使用键控状态来跟踪窗口中的元素计数encounteredElementsCountState

收到第一个元素后,我EventTimeTimer注册到窗口端。这应该触发窗口关闭并按预期工作的FIRE_AND_PURGE

如果计数超过阈值,我会尝试触发早期触发。这似乎也很成功,processwindow在这次触发后立即调用该函数。

问题是,我不得不在不了解原因的情况下插入下面的检查代码。因为之前收集的元素再次提供给onElement方法:

if (ctx.getCurrentWatermark() < 0) {
logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
return TriggerResult.CONTINUE;
}

我想不通原因。我看到的是,发生这种情况时,水印值(ctx.getCurrentWatermark()) Long.MIN_VALUE(导致上述检查)。怎么会这样?

此检查似乎避免了重复的早期事件生成,但我不知道为什么会发生这种情况,并且此解决方法是否合适。

您能否告知为什么在窗口中处理相同的元素两次?

另一个问题是关于键控状态的用法。此实现在释放窗口后是否泄漏任何状态?我正在尝试以触发器的清晰方法清除所有使用的状态,但这足够吗?

问候。

每个任务都将 currentWatermark 初始化为 Long.MIN_VALUE,这仍然是 currentWatermark 的本地值,直到从该任务的所有输入流中到达更大的水印。希望知道这一点将帮助您更好地了解正在发生的事情。

就其价值而言,通常使用 ProcessFunction 实现这种逻辑比使用 Window API 更直接。

最新更新