我们正在运行一个列表状态在 300GB 到 400GB 之间的作业,有时列表可能会增长到几千个。在我们的用例中,每个项目都必须有自己的 TTL,因此我们在 S3 上使用 RocksDB 后端为这个 ListState 的每个新项目创建一个新的计时器。
目前大约有 140+ 数百万个计时器(将在event.timestamp + 40 天触发(。
我们的问题是,作业的检查点突然卡住了,或者非常慢(几个小时内为 1%(,直到最终超时。它通常会在一段非常简单的代码上停止(flink 仪表板显示0/12 (0%)
,而前面的行显示12/12 (100%)
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息 :
- AT_LEAST_ONCE检查点模式似乎比EXACTLY_ONCE更容易卡住
- 几个月前,该州的数据量上升到1.5TB,我认为数十亿计时器没有任何问题。
- 运行两个任务管理器的计算机上的RAM,CPU和网络看起来正常
state.backend.rocksdb.thread.num = 4
- 第一个事件发生在我们收到大量事件(大约数百万分钟(时,而不是在前一个事件上。
- 所有事件都来自卡夫卡主题。
- 在AT_LEAST_ONCE检查点模式下,作业仍会正常运行和使用。
这是第二次发生在我们身上,拓扑运行得非常好,每天有数百万个事件,突然停止检查点。我们不知道是什么原因造成的。
任何人都可以想到什么会导致检查站突然卡住?
一些想法:
如果有许多计时器或多或少同时触发,则此计时器风暴将阻止任何其他事情发生 - 任务将循环调用onTimer,直到没有更多的计时器要触发,在此期间它们的输入队列将被忽略,并且检查点屏障将不会进行。
如果这是导致您遇到麻烦的原因,您可以向计时器添加一些随机抖动,以便事件风暴不会在以后变成计时器风暴。重新组织内容以使用状态 TTL 可能是另一种选择。
如果堆上有很多计时器,这可能会导致非常高的 GC 开销。这不一定会使作业失败,但会使检查点不稳定。在这种情况下,将计时器移动到 RocksDB 可能会有所帮助。
另外:由于您使用的是 RocksDB,因此从 ListState 切换到 MapState(以时间为键(可以让您删除单个条目,而无需在每次更新后重新序列化整个列表。(使用 RocksDB,MapState 中的每个键/值对都是一个单独的 RocksDB 对象。以这种方式使清理更有效可能是最好的补救措施。