Use Cases of Flink CheckpointedFunction



在浏览Flink官方文档时,我遇到了CheckpointedFunction。想知道你为什么以及什么时候会使用这个功能。我目前正在做一个有状态的Flink工作,它严重依赖于ProcessFunction来保存RocksDB中的状态。只是想知道CheckpointedFunction是否比ProcessFunction更好。

CheckpointedFunction适用于需要使用应由Flink管理并包含在检查点中的状态,但不使用KeyedStream,因此无法像在KeyedProcessFunction中那样使用键控状态的情况。

CheckpointedFunction最常见的用例是在源和汇中。

除了@David的答案之外,我还有另一个用例,其中我不将CheckpointedFunctionsourcesink一起使用。我确实在ProcessFunction中使用了它,我想(以编程方式(计算我的工作重新启动的次数。我使用MyProcessFunctionCheckpointedFunction,并在作业重新启动时更新ListState<Long> restarts。我在集成测试中使用这种状态来确保作业在失败时重新启动。我的示例是基于Sinks的Flink检查点示例。

public class MyProcessFunction<V> extends ProcessFunction<V, V> implements CheckpointedFunction {
...
private transient ListState<Long> restarts;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception { ... }
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
restarts = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("restarts", Long.class));
if (context.isRestored()) {
List<Long> restoreList = Lists.newArrayList(restarts.get());
if (restoreList == null || restoreList.isEmpty()) {
restarts.add(1L);
System.out.println("restarts: 1");
} else {
Long max = Collections.max(restoreList);
System.out.println("restarts: " + max);
restarts.add(max + 1);
}
} else {
System.out.println("restarts: never restored");
}
}
@Override
public void open(Configuration parameters) throws Exception { ... }
@Override
public void processElement(V value, Context ctx, Collector<V> out) throws Exception { ... }
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<V> out) throws Exception { ... }
}

相关内容

  • 没有找到相关文章

最新更新