Flink作业在10分钟后中断



我有一个带有全局窗口和自定义流程的flink作业
流程在大约10分钟后失败,出现下一个错误:

java.io.InterruptedIOException

这是我的工作:

SingleOutputStreamOperator<CustomEntry> result = stream
.keyBy(r -> r.getId())
.window(GlobalWindows.create())
.trigger(new CustomTriggeringFunction())
.process(new CustomProcessingFunction());

CustomProcessingFunction运行很长一段时间(超过10分钟(,但10分钟后,进程将停止,并在InterruptedIOException上失败

是否可以增加flink作业的超时时间

从Flink的角度来看,用户函数运行的时间太长了。这个窗口处理函数在做什么,需要10分钟以上的时间?也许您可以将其重新构造为使用异步i/o运算符,这样就不会完全阻塞管道。

也就是说,10分钟是默认的检查点超时间隔,并且您正在阻止检查点在该功能运行时完成。所以你可以尝试增加execution.checkpointing.timeout。如果由于检查点超时而导致工作失败,那将有所帮助。或者可以将execution.checkpointing.tolerable-failed-checkpoints从默认值(0(增加。

最新更新