这似乎是一个非常简单的问题strong>
作为参考,也有类似的帖子:从代码中取消Apache Flink作业,但是它没有告诉您如何获得JobManager,该方法可能会有所帮助。
有人可以在此上散发一些灯光?
我认为,通过代码取消Flink作业的最简单方法是使用REST API。请参阅:https://ci.apache.org/projects/flink/flink/flink-docs-release-1.2/monitoring/rest_api.html#job-cancellation
然后,您可以在Flink-Code的主类中定义重新启动策略。喜欢
final int restartAttempts = configuration.getInteger(RESTART_ATTEMPTS, 3);
final int delayBtwAttempts = configuration.getInteger(RESTART_DELAY_IN_MILLIS, 3000);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(fixedDelayRestart(restartAttempts, delayBtwAttempts));
请参阅:https://ci.apache.org/projects/flink/flink/flink-docs-rease-1.2/dev/restart_strategies.html
我们可以将配置传递给Flink Env,例如:
Configuration configuration = new Configuration();
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, "//savepointPath");
configuration.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
flinkProperties.getIp(),
flinkProperties.getPort(), configuration);