如何从代码重新启动flink作业



这似乎是一个非常简单的问题strong>

作为参考,也有类似的帖子:从代码中取消Apache Flink作业,但是它没有告诉您如何获得JobManager,该方法可能会有所帮助。

有人可以在此上散发一些灯光?

我认为,通过代码取消Flink作业的最简单方法是使用REST API。请参阅:https://ci.apache.org/projects/flink/flink/flink-docs-release-1.2/monitoring/rest_api.html#job-cancellation

然后,您可以在Flink-Code的主类中定义重新启动策略。喜欢

final int restartAttempts = configuration.getInteger(RESTART_ATTEMPTS, 3);
final int delayBtwAttempts = configuration.getInteger(RESTART_DELAY_IN_MILLIS, 3000);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(fixedDelayRestart(restartAttempts, delayBtwAttempts));

请参阅:https://ci.apache.org/projects/flink/flink/flink-docs-rease-1.2/dev/restart_strategies.html

我们可以将配置传递给Flink Env,例如:

Configuration configuration = new Configuration();
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, "//savepointPath");
configuration.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
        flinkProperties.getIp(),
        flinkProperties.getPort(), configuration);

相关内容

  • 没有找到相关文章

最新更新