EMR中基于AWS Graviton的EC2实例升级导致任务失败



我在EMR中运行了一个spark Scala作业,我正在努力改进它。截至目前,它在m5.8xlarge上运行,没有任何问题。我最近尝试升级到基于Graviton的EC2实例m6g.8xlarge,虽然这项工作确实成功了,但我看到了一些奇怪的问题。我看到的一些问题是任务由于超时而失败,阶段以奇怪的顺序运行,而且看起来内存紧张。无序运行的阶段是任务失败的阶段,阶段6运行然后失败,然后阶段4&5完成,然后阶段6重试成功。在目前正在进行的m5.8xlarge运行中;5被跳过。我不知道为什么会发生这种情况,因为我所做的唯一更改是从m5实例类型改为m6g,所以我想看看是否有人经历过类似的事情或有解决方案。我也会发布一些失败任务中的错误,但我认为它们与oom有关。

这是我看到的主要错误:

ERROR TransportClientFactory:261 - Exception while bootstrapping client after 60041 ms
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:263)
at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:116)
at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:89)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:257)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at org.apache.spark.network.shuffle.ExternalShuffleClient.lambda$fetchBlocks$0(ExternalShuffleClient.java:100)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:109)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:264)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:614)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:609)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:442)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:160)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:66)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:173)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:259)
... 39 more

我认为这不是内存不足的问题。m6g.8xlarge和m5.8xlarge都有120 GB的内存,具体规格如下:https://aws.amazon.com/ec2/instance-types/m6g和https://aws.amazon.com/ec2/instance-types/m5

我在回溯中看到超时是在身份验证过程中:

首先,它无法在doBootstrap中使用Spark的auth协议进行身份验证(AuthClientBootstrap.java:89https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java#L99

通过使用Spark的身份验证协议执行身份验证,启动{@link TransportClient}。如果服务器在身份验证过程中抛出错误,并且配置允许,则此引导程序将返回使用SASL引导程序。这用于与不支持新协议的外部shuffle服务的向后兼容性。

,然后它也无法通过doBootstrap中的SASL进行身份验证(SaslClientBootstrap.java:70https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java#L54

  • 通过对连接执行SASL身份验证来启动{@link TransportClient}。应该使用{@link SaslRpcHandler}设置服务器,该服务器具有给定appId的匹配密钥
  • 通过发送令牌执行SASL身份验证,然后继续使用SASL质询-响应令牌,直到成功进行身份验证或由于不匹配而引发异常

相关内容

最新更新