州生存时间.它是如何与Apache Flink CEP模式一起工作的



我阅读了Apache Flink关于State Time TO Live的文档https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-是时候活下去了
我不明白有两个时刻
1(
StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//And use in my Process Function 
valueStateDescriptor.enableTimeToLive(ttl);

如果我将在15:00在ValueState中放入一些元素,然后停止我的保存点作业,并且只有在17:00我才会从上一个保存点开始我的作业
价值状态会很清楚,对吗
2(如果我使用Apache Flink CEP模式:

.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));

如果我会在15:00得到一个元素,然后停止我的保存点工作,只有在17:00我才会从上一个保存点开始我的工作。而得到B元素,图案不匹配对吗
它(ttl(是如何与Apache Flink CEP模式协同工作的
谢谢

我了解CEP,我真的在使用摄入时间。我将尝试解释:我使用带有ValueState和timerTime的Process Function,并在onTimer方法中清除状态。我放入state(keyedstate(中的一些元素,将计时器设置为1小时,并执行一些逻辑。基本上,值状态+计时器用作输出限制器(1小时内有1条输出消息(。在我的公司,我们需要停止在集群上运行作业(带有保存点(,然后几个小时后,我们需要从上一个保存点重新启动作业。现在我没有使用TTL,重新启动后,我的ValueState.value不为null。我希望在重新启动后不到一个小时内ValueState.value不为null(如果我在停止前放入状态(,但超过一个小时的状态总是为null
P.s我使用RrocksDb状态后端,间隔为1s的增量检查点。它非常有效。(

如果我要在15:00放入ValueState中的某个元素,然后用保存点停止我的作业,只有在17:00我才会从上一个保存点开始我的作业。价值状态会很清楚,对吗?

(1(这个ValueState实际上会消失,但我不确定它是否真的会消失。如果您的状态TTL配置包括cleanupFullSnapshot(),则如果您在16:00之后获取保存点,则可以保证保存点不会包括有问题的状态。但在这种情况下,这两件事似乎都不是真的,所以状态就在快照中。我不知道是在快照恢复过程中还是在下一次清理过程中删除了过期的状态。但是,由于您已经指定了NeverReturnExpired,因此它不会影响结果。

它(ttl(如何与Apache Flink CEP模式协同工作?

(2(CEP不使用状态TTL。CEP在可能影响模式匹配的时间内保持状态,并在不再需要状态时显式清除状态。从你表达这个问题的方式来看,我认为你使用的是处理时间,而不是事件时间。在这种情况下,该模式将不会在60分钟内匹配。但是,如果使用事件时间,那么水印将用于确定经过了多少时间,而停机时间对模式匹配没有影响。

更新:

我现在看到您正在使用摄取时间,并依靠计时器来清除状态。对于摄取时间,您可以选择使用事件时间或处理时间计时器。如果使用处理时间计时器,则在作业未运行时应激发的任何计时器都将在作业重新启动后立即激发。对于事件时间计时器,一旦水印到达计时器中的时间,它们就会启动。由于水印不保存在保存点中,因此在创建任何水印之前,必须对某些事件进行流动和处理(对于周期性水印,必须经过自动水印间隔(。

最新更新