My Flink作业运行了几天没有任何问题,但几天后它会杀死tm并重新启动整个作业。在我发现的日志中,org.apache.flink-contrib.streaming.state.RocksDBKeyedStateBackend-删除现有实例基本目录/tmp/flink-io-4b455ef-bcde-4ef2-ed3-c66ca9d8933e/job_152b986e7e5a6f411780849f13ce4bc8_op_KeyedProcessOperator_a1c286a47e97622aa92a8f6cd4115854__1_4_uid_4b53ff24-e240-48d6-b438-3ab2d05cbdb8
在它删除了statestore文件后,它抛出了这个错误,因为我正在从那里的statestore获取数据。
java.lang.NullPointerException
at c.c.w.d.s.b.aggregator.StateProcessFunction.addEvent(StateProcessFunction.java:81)
at c.cs.w.d.s.b.a.StateProcessFunction.processElement(StateProcessFunction.java:113)
at c.c.w.d.s.b.a.StateProcessFunction.processElement(ContactStateProcessFunction.java:26)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
问题是何时以及为什么启动flink后端状态存储文件删除?日志中有没有打印的东西?
在Flink的最新版本中,该消息被重新编写为更清晰:
关闭RocksDB状态后端。正在清理RocksDB工作目录{}。
这发生在RocksDBKeyedStateBackend.dispose()
中,当Flink关闭时调用它。Flink仅依赖于RocksDB的瞬态。从故障中重新启动后,将从最近的检查点创建新的RocksDB实例。(尽管如果你使用本地恢复,情况会有点复杂。(
你用的是哪个版本的Flink?增量检查点还是完整检查点?本地恢复?也许你遇到了一个已经修复的错误。
这是因为并发性。mapstate.get((抛出异常,有时当rocksdbackend异步线程删除过期数据时,它会锁定文件,同时如果您尝试获取它,则会抛出异常,导致tm重新启动。