为状态检查点Flink sql



当我使用flink sql-api处理数据时。

重新启动应用程序,sum结果未保存在检查点中。它仍然以1开头。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file:///D:/d_backup/github/flink-best-practice/checkpoint");
env.enableCheckpointing(1000 * 60);
env.setStateBackend(stateBackend);
Table table = tableEnv.sqlQuery(
"select sum(area_id) " +
"from rtc_warning_gmys " +
"where area_id = 1 " +
"group by character_id,area_id,group_id,platform");
//   convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
//   The boolean field indicates the type of the change.
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> dsRow = tableEnv.toRetractStream(table, Row.class);
dsRow.map(new MapFunction<Tuple2<Boolean,Row>, Object>() {
@Override
public Object map(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception {
if(booleanRowTuple2.f0) {
System.out.println(booleanRowTuple2.f1.toString());
return booleanRowTuple2.f1;
}
return null;
}
});
env.execute("Kafka table select");

登录为:

12.3.。。。。。。100

重新启动应用程序它仍然启动:1.2.3.…

我认为总值将存储在检查点文件中,重新启动应用程序可以读取检查点的最后结果,如:

101102103。。。120

一些可能性:

  • 作业运行的时间是否足以完成检查点?仅仅因为作业产生了输出并不意味着检查点已经完成。我看到您将检查点设置为每分钟发生一次,检查点可能需要一些时间才能完成。

  • 工作是如何停止的?除非检查点已外部化,否则在取消作业时会删除检查点。

  • 作业是如何重新启动的?它是从检查点(自动)恢复的,还是从外部化的检查点或保存点恢复的,或者是从头开始重新启动的?

这类实验最容易通过命令行进行。例如,您可以使用

  1. 编写一个使用检查点并具有重新启动策略(例如env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000)))的应用程序
  2. 启动本地群集
  3. "flink run-d app.jar"启动作业
  4. 等待至少一个检查点完成
  5. "kill-9任务管理器PID"导致失败
  6. "taskmanager.sh start"以允许从检查点恢复作业

相关内容

  • 没有找到相关文章

最新更新