Flink-远程任务管理器意外关闭了连接



我正在使用Flink v.1.13.2,其中有一个工作经理和三个任务经理。

由于某种原因(我找不出原因(,任务管理器的连接正在丢失。这是我发现的日志:

2022-02-17 21:19:55,891 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Print to Std. Out (13/32) (f0ff88713cc3ff5ce39e7073468abed4) switched from RUNNING to FAILED on 1.2.3.5:39309-f61daa @ server.name (dataPort=43421).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '1.2.3.4/1.2.3.4:43421'. This might indicate that the remote task manager was lost.
at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
2022-02-17 21:19:55,912 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task ba13f73bc80e020e655c42a6f182ba1b_12.
2022-02-17 21:19:55,927 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 1919 tasks should be restarted to recover the failed task ba13f73bc80e020e655c42a6f182ba1b_12. 

原因可能是什么?

更新:最后,我刚刚从JobManager中找到了以下错误日志

我使用的是独立部署模式。

  • flink是如何决定没有可用资源的?(如何计算最大限度(
  • 如果我将一些流程函数(或平面图、过滤器等(合并到一个流程中,那么我能解决这个问题吗
2022-02-18 12:31:00,404 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job FlinkStreamProcessing (c447c54ed67c5ca2be360fce46420fba) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=10, backoffTimeMS=10000)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1140) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1080) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:911) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.ExecutionVertex.markFailed(ExecutionVertex.java:472) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultExecutionVertexOperations.markFailed(DefaultExecutionVertexOperations.java:41) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskDeploymentFailure(DefaultScheduler.java:507) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$7(DefaultScheduler.java:492) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge$PendingRequest.failRequest(DeclarativeSlotPoolBridge.java:535) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:128) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:362) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:351) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:816) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.2.jar:1.13.2]
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632) ~[?:?]
... 32 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
... 28 more

TM断开连接、TM机器异常、TM异常退出、JM负载过高等原因有很多。

从日志错误中,无法找到根本原因。

可能您还可以提供TM错误日志。

相关内容

  • 没有找到相关文章

最新更新