我们在 Kubernetes 上使用 Apache Flinkjob cluster
,它由一个Job Manager
和两个Task Managers
组成,每个有两个插槽。群集是使用Lightbend Cloudflow
框架部署和配置的。
我们还使用RocksDB
状态后端和与 S3 兼容的存储来实现持久性。考虑从 CLI 创建savepoints
没有任何问题。我们的工作由几个键控状态(MapState
(组成,并且往往相当大(我们预计每个状态至少 150 Gb(。作业的Restart Strategy
设置为Failure Rate
。我们在整个工作中都使用Apache Kafka
作为源头和下沉。
我们目前正在进行一些测试(主要是PoC(,有几个问题挥之不去:
我们做了一些综合测试,并将不正确的事件传递给了作业。这导致Exceptions
是在处决期间被抛出的。由于Failure Rate
策略,将发生以下步骤: 来自 Kafka 的损坏消息通过源 -> 读取 运算符尝试处理事件并最终抛出Exception
-> 作业重新启动并从 Kafka 读取与之前的步骤相同的记录 -> 运算符失败 ->Failure Rate
最终超过给定值,作业最终停止。接下来我应该怎么做?如果我们尝试重新启动作业,它似乎将使用最新的 Kafka 消费者状态恢复,并将再次读取损坏的消息,从而将我们带回前面提到的行为?解决此类问题的正确步骤是什么?Flink 是否使用任何所谓的Dead Letter Queues
?
另一个问题是关于检查点和恢复机制。我们目前无法弄清楚在作业执行期间引发的哪些异常被视为关键异常,并导致作业失败,然后从最新检查点自动恢复?如前面的情况所述,作业内部引发的普通Exception
会导致连续重启,最终导致作业终止。我们正在寻找一种情况,以便在集群发生导致从最新检查点自动恢复时发生某些事情(Job Manager
失败、Task Manager
失败或其他情况(时重现。考虑到 Kubernetes 集群中的这种情况,欢迎任何建议。
我们已经沉浸在 Flink 官方文档中,但没有找到任何相关信息,或者可能以错误的方式感知它。非常感谢!
Flink 的 Kafka反序列化器采用的方法是,如果deserialize
方法返回 null,那么 Flink Kafka 消费者将静默跳过损坏的消息。如果它抛出一个IOException
,管道将重新启动,这可能会导致失败/重新启动循环,如您所指出的。
这在文档本节的最后一段中进行了描述。
过去关于这个主题的工作和讨论可以在 https://issues.apache.org/jira/browse/FLINK-5583 和 https://issues.apache.org/jira/browse/FLINK-3679 以及 https://github.com/apache/flink/pull/3314 中找到。
死信队列将是一个很好的改进,但我不知道在这方面有任何努力。(目前,进程函数的端输出是实现死信队列的唯一方法。