DataProc Cluster FetchFailedException



我将尽量在解释和设计参数时做到彻底和简洁。因此,我已经进行了几次初始迭代,但我根本不擅长Java(我最喜欢的编码语言是Python),也不了解如何设置集群的体系结构,使其不会挂起或失败。在高层次上,我有一个庞大的数据集(大约1.8万亿个数据点,120 TB的数据),其中有以Lat, Lon形式的位置数据。我正在使用Apache Sedona或GeoSpark(努力了解如何在我的python Pyspark代码中配置和使用它们)

我的工作流程:

  • 创建DataProc集群
  • 从GCS Bucket和BigQuery Table中加载数据(原始数据和一些参考数据集)
  • 做一些地理空间处理来提取点(例如,使用Lat, Lon来分配US State和US County)
  • 将新数据保存到GCS桶

所以我使用了一个大约几百个点的小数据样本。并且能够很好地做到这一点。当我试图运行整个数据集时,它似乎遇到了很多问题。

以下只是我在DataProc作业日志中看到的一些事情:

WARN org.apache.spark.sql.execution.datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 625 for reason Container marked as failed: container_1633477513137_0001_01_000626 on host: 
ERROR org.apache.spark.scheduler.cluster.YarnScheduler: Lost executor 625
WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 85581) (patino-pyspark-testing-sw-r96f.[<b>removed google info here</b>].internal executor 443): FetchFailed(BlockManagerId(598...
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ....
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId[streamId=493540200000,chunkIndex=0]:

这些错误可以持续几天,我觉得这一切都归结为我只是不完全理解Hadoop和Spark的配置,我不知道我在做什么。

我正在采取这些方法,因为我的BQ操作在尝试处理那里时已经超时。

我真的很想解释一下:

传递JAR文件的正确方式(是在作业级别传递还是在创建集群时传递?)

如何正确安装/设置Sedona或GeoSpark,不关心使用哪一个只是想让它工作

任何和所有的设置/配置(仍然是新手,所以我对后续问题表示歉意),我是否在创建集群或提交作业时传递这些信息

排除任何日志。



我知道这是一堵文字墙,非常感谢所有帮助我的努力和评论。再次感谢!

虽然你关于集群架构的更一般的问题很可能超出了StackOverflow问答的范围,但有一些注意事项浮现在脑海中:

  1. 对于调试Spark作业,另一种访问运行时信息的方法是通过Spark的UI和HistoryServer,它将提供一些工作级别的日志,花费的时间,发送给不同工作的数据量,甚至堆栈跟踪。访问Spark UI的最佳方式是在创建集群时启用组件网关
  2. 你的日志信息提到主机名"patino-pyspark-testing-sw-r96f"-这似乎是一个"二级工人"默认使用可抢占虚拟机。虽然Dataproc尽最大努力使典型的工作负载在这些虚拟机类型中尽可能平稳地运行,但最终,可抢占虚拟机在设计上有点不可预测。您的VM很可能只是被另一个按需工作负载抢占了,从而导致临时故障。一些工作负载可以很好地处理worker失败的自动重试(特别是仅映射作业和具有最小外部依赖关系的作业),而其他工作负载则更敏感(如果BigQuery依赖关系使任务级重试变得更加困难,如果您有很多连接/shuffle数据,等等)。如果第一次尝试使工作负载脱离地面,您可能希望坚持按需VM类型,并且只有在您知道您的作业可以很好地容忍任务失败时才引入pvm。
  3. 如果spark需要shuffle/group/aggregate或cache/checkpoint中间数据,它将需要磁盘空间。此外,GCE vm的IO性能会根据磁盘大小进行调整,因此即使您不使用大量磁盘,小型磁盘也会非常慢。如果Spark要使用磁盘,您可能希望确保集群中的磁盘总量至少是输入数据集大小(120TB)的两倍。工人日志将给出一些指示,说明您是否有可能由于"磁盘不足"而失去工人。错误。

FetchFailedException是一个非常常见的错误,通常发生在具有可抢占虚拟机或自动伸缩但未启用EFM的集群中,详细信息请参阅本文档。要避免这个问题,要么避免pvm,要么避免自动伸缩;或在启用pvm或自动缩放时启用EFM。请注意,EFM在1.4和1.5上可用,到2021年10月还没有在2.0上可用。因此,如果你必须在2.0上使用Spark 3,你必须选择第一个选项。

此外,您还需要确保集群有足够的磁盘。对于120TB的输入数据,我将考虑50个主工作节点的集群形状,每个节点具有4TB的磁盘。