Flink Cluster Kubernetes TaskManager和JobManager心跳失败



我正在尝试使用本机kubernetes运行flink集群。

下面是flink conf,

jobmanager.rpc.port: 6123
blob.server.port: 6124
taskmanager.rpc.port: 6122
taskmanager.numberOfTaskSlots: 10
jobmanager.memory.process.size: 5120m
jobmanager.memory.jvm-overhead.max: 512m
taskmanager.memory.process.size: 6656m
taskmanager.memory.framework.heap.size: 435m
taskmanager.memory.framework.off-heap.size: 217m
taskmanager.memory.jvm-overhead.max: 435m
kubernetes.jobmanager.cpu: 1
kubernetes.taskmanager.cpu: 1
# akka settings
akka.ask.timeout: 300s
akka.tcp.timeout: 1200s

# JVM configurations
env.java.opts: "-XX:+HeapDumpOnOutOfMemoryError -XX:NativeMemoryTracking=summary -XX:+UseG1GC -Dkubernetes.websocket.ping.interval=300000"

# checkpoint config
execution.checkpointing.interval: 2min
execution.checkpointing.timeout: 30min # savepoint usually takes longer
execution.checkpointing.min-pause: 110s
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION
execution.checkpointing.tolerable-failed-checkpoints: 2
execution.checkpointing.snapshot-compression: true
execution.savepoint.ignore-unclaimed-state: true
#execution.checkpointing.unaligned: true
# heartbeat settings
cluster.registration.initial-timeout: 1000
cluster.registration.max-timeout: 300000
cluster.services.shutdown-timeout: 300000
heartbeat.timeout: 120000
heartbeat.interval: 60000

首先,JobManager和TaskManager成功启动,TM处理少量事件。但是在超时间隔之后,JM抛出心跳错误,并启动新的任务管理器,如下所示。旧的任务管理器仍在运行,新的任务管理器在相同的命名空间启动。

无法解释这种行为。我确定检查了任务管理器和作业管理器上的内存,没有内存问题。此外,TM和JM运行没有崩溃。除了超时时间后TM不能被识别,新的TM被启动。

2021-09-09 03:23:32,886 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Co-Process-Broadcast (10/10) (14679f59d1accdc6f925e2637eede0c9) switched from RUNNING to FAILED on flink-taskmanager-1-4 @ 100.114.72.234 (dataPort=41845).
java.util.concurrent.TimeoutException: The heartbeat of TaskManager with id flink-taskmanager-1-4  timed out.
at org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-runtime_2.12-1.12.1.jar:1.12.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.12-2.5.21.jar:2.5.21]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.12-2.5.21.jar:2.5.21]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [akka-actor_2.12-2.5.21.jar:2.5.21]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [akka-actor_2.12-2.5.21.jar:2.5.21]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.12-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.12-2.5.21.jar:2.5.21]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.12-2.5.21.jar:2.5.21]

我还检查了/etc/hosts文件以查看

# Kubernetes-managed hosts file.
127.0.0.1       localhost
::1     localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
100.114.126.100 flink-5fd769dbcf-k4cnb
2620:149:106a:220d::7418        flink-5fd769dbcf-k4cnb

你知道这里有什么问题吗?谢谢你。

可能需要确认JM (cpu)的配置core)和tm的数量,是否可能是JM有高负载,还是JM执行了一些用户逻辑代码?

最新更新