我有两个具有以下配置的mapr集群,
cluster 1: hosted on aws, 3 nodes with 32g of memory/32 cores each
cluster 2: hosted on bare-metal servers, 8 nodes with 128g of memory/32 cores each
我正在两个集群上运行以下pyspark代码
df=hc.sql("select * from hive_table")
df.registerTempTable("df")
df.cache().count()
for: 100times
result=hc.sql('select xxxx from df')
result.write.saveAsTable('some hive table', mode='append')
上面的代码在spark中提交了100个新作业(在yarn上运行)。在集群1上,整个操作在30分钟内完成,但在规模较大的集群2上,完成相同操作需要90分钟。经过检查,我发现尽管每个作业花费的时间几乎相同(在集群2中稍微快一点),但在集群2中,每个作业之间的时间远远高于1。
可能的原因,
- 驱动程序和执行程序节点之间的延迟?--我正在
纱线客户端模式下运行 - 驱动器内存不足,或者我在纱线上启动火花簇的方式是错误的
如何提交作业?
Cluster 1: /opt/mapr/spark/spark-1.6.1/bin/spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 10g --executor-cores 5 --driver-memory 10g --driver-cores 10 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="100" --queue default
Cluster 2: /opt/mapr/spark/spark-1.6.1/bin/spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-memory 80g --executor-cores 28 --driver-memory 25g --driver-cores 25 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="100" --queue default
PS:只粘贴了部分代码。代码中还有其他模块。总的来说,集群2处理代码的速度是集群1的3倍,所以我不认为"一般"速度有问题。
我的问题更具体地说是两份工作之间的"时间"。例如,上面的代码运行了100个spark-sql作业,每个作业在集群2中平均占用2秒,在集群1中平均占用5秒。与集群1相比,集群2中每个作业之间的时间太长。
在您的伪代码中,我没有看到任何与驱动程序相关的操作(假设执行器将数据保存到分布式FS)
请注意:
- 您
df.cache()
,但似乎没有使用缓存的df - 您的yarn客户端配置似乎有问题
看起来您正在尝试使用比可用的更多的执行器内存和内核。
在集群#1中,有3个节点具有32GB的RAM,您的执行代码为:--num-executors 10 --executor-memory 10g
在最佳情况下,您将有9个执行器,每个执行器具有10GB的RAM。每个节点上最多有3个执行器。我假设每个节点只能执行2个执行器(从32GB的RAM中,超过2GB将用于纱线、开销等,因此剩下不到29GB===>每个10GB的2个容器)
===>集群#1将有6到9个执行者
在集群#2中,有5个节点具有128GB的RAM,您的执行代码为:--num-executors 10 --executor-memory 80g
在最佳情况下,您将有5个具有80GB RAM的执行器。一个节点上的每个执行器。
由于集群#1有更多的执行器(即使它们更小),它可能会运行得更快(取决于您的应用程序)
减少集群#2中的执行器内存和核心,同时增加执行器的数量,应该会提供更好的性能。