通过HDFS读取200GB CSV文件(pyspark)的调整Spark(YARN)集群



我现在正在使用一个运行在AWS上的11节点集群(1个主集群,10个工作集群-c3.4xlarge),我正试图从HDFS读取约200GB的.csv文件(只有大约10个实际的.csv文件)。

这个过程进行得非常缓慢。我正在看命令行的spark,它看起来是这样的。。

[Stage 0:>                                                      (30 + 2) / 2044]

每20秒增加+2个单位(即30+2到32+2到34+2等)的进度。所以这是非常需要改进的,否则我们将在这里呆大约11个小时,直到文件被读取。

这就是到目前为止的代码。。

# AMAZON AWS EMR
def sparkconfig():
    conf = SparkConf()
    conf.setMaster("yarn-client)    #client gets output to terminals
    conf.set("spark.default.parallelism",340)
    conf.setAppName("my app")
    conf.set("spark.executor.memory", "20g")
    return conf

sc = SparkContext(conf=sparkconfig(),
             pyFiles=['/home/hadoop/temp_files/redis.zip'])
path = 'hdfs:///tmp/files/' 
all_tx = sc.textFile(my_path).coalesce(1024)
... more code for processing

现在很明显,1024的分区可能是不正确的,那只是因为谷歌搜索和尝试了不同的东西。当谈到调整这份工作时,我真的不知所措。

AWS的工作节点是c3.4xlarge实例(我在集群中有10个),由30GB的RAM组成,每个RAM有16个vCPU。HDFS分区由集群中每个节点的本地存储组成,它是2x160GB SSD,所以我相信我们看到的是(2*160GB*10节点/3复制)=~1TB的HDFS。

.csv文件本身的大小从5GB到90GB不等。

为了说明它是否相关,Hadoop集群与spark集群在节点上是相同的。我将每个节点30GB中的20GB分配给spark执行器,将每个节点10GB留给OS+Hadoop/YARN等。名称节点/spark父节点是一个m3.xlarg,它有4个vcpu和16GB的RAM。

有人对我可能尝试加快文件读取过程的调优选项(或者其他任何东西)有什么建议吗?

Shamless Plug(作者)try Sparklenshttps://github.com/qubole/sparklens大多数时候,真正的问题不是应用程序是否缓慢,而是它是否会扩展。对于大多数应用程序来说,答案是有限的。

spark应用程序的结构对其可扩展性提出了重要的限制。一个阶段中的任务数量、阶段之间的依赖关系、偏差和在驱动程序端完成的工作量是主要的限制因素。

最新更新