为什么 Flink 重试运算符不会失败?



我计划将我们的一个Spark应用程序迁移到Apache Flink。我正在尝试了解其容错功能。

我执行了以下代码,我没有看到 Flink 实际上尝试重试任何任务(或子任务(。这可能会导致我数据丢失。我应该怎么做才能确保 Flink 涵盖的所有故障?

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("file:///my-path", false))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
))
env.enableCheckpointing(10L)
val text = env.socketTextStream(hostName, port)
text
.map { input =>
List(input)
}.setParallelism(1)
.flatMap { input =>
println(s"running for $input")
List.fill(5)(input.head) ::: {
println("throw exception here")
throw new RuntimeException("some exception")
List("c")
}
}

我希望在屏幕上看到throw exception here消息几次。但是,当我使用fixedDelayRestart时,看起来它只是忽略了此消息并继续为其他人服务。

这取决于您如何启动应用程序。

我假设您正在从 IDE 中运行它。在这种情况下,StreamExecutionEnvironment.getExecutionEnvironment返回一个运行程序的LocalStreamExecutionEnvironment,并且单个进程中的所有 Flink,即 master(在 Flink JobManager 中(和 worker (TaskManager( 在同一个 JVM 进程中作为线程启动。异常将终止此单个进程。因此,没有 Flink 进程可以重新启动程序。

如果你想以容错运行程序,你需要把它提交到 Flink 环境,比如在本地机器上运行的环境。下载 Flink 发行版,解压归档文件,运行./bin/start-cluster.sh。这将启动两个进程,一个主进程和一个工作进程。然后,您可以通过创建具有StreamExecutionEnvironment.createRemoteEnvironment的远程执行环境并将主机名和端口作为参数传递到集群(有关详细信息,请查看文档(。

请注意,异常仍将终止工作进程。因此,为了能够重新启动程序,您需要手动启动工作进程。在生产环境中,这通常由 Kubernetes、Yarn 或 Mesos 负责。

顺便说一下,我们最近在 Flink 文档中增加了一个操作游乐场。这是一个基于 Docker 的沙盒环境,可以玩转 Flink 的容错功能。我建议看看:Flink Operations Playground。

更多提示:

  • 10ms 的检查点间隔非常短。
  • 文本套接字源不提供至少一次(或恰好一次(保证。记录最多处理一次。

相关内容

  • 没有找到相关文章

最新更新