当检查点用于写入 Cassandra 的 Flink 作业中时,当此写入由于连接问题而失败时,作业将失败并在一定时间间隔后重新启动。
当记录失败时,此作业从哪里开始?它是选择要处理的下一条记录,还是重置偏移量并尝试重新处理失败的记录?
我的检查点配置如下所示,
try{
env.setStateBackend(new RocksDBStateBackend(props.getFlinkCheckpointDataUri(), true));
env.enableCheckpointing(10000, EXACTLY_ONCE); //10 seconds
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
catch(Exception e){
System.out.println("Failed to prepare stream execution environment");
}
如果启用了检查点,Flink 保证至少一次 Cassandra sink 的交付(如果你对 C* 实例的更新请求是幂等的,则恰好一次;这意味着可以在不更改结果的情况下多次应用更新([ref]。换句话说,如果记录执行失败,则不会提交包含这些记录的快照的检查点。因此,将完全重试失败检查点的记录。
这是有效的,因为 Cassandra sink 有一个检查点提交器,该提交程序将有关已完成检查点的其他信息存储在某个资源中。此信息用于防止在发生故障时完全重播上次完成的检查点 [ref]。