作为执行器和线程数量的函数,spark中的分区数量是多少



我在EMR上使用Spark。我启动了一个集群,有时集群很小(在编写/测试代码时),比如5-10个实例。使用更多实例执行相同代码的其他时间,比如30-50。

我知道我可以访问配置来帮助设置分区数量,选择好数量的分区有助于运行时。

我想将分区的数量参数化为执行器数量和线程数量的函数:

val instanceCount = sc.getConf.get("spark.executor.instances").toDouble
val coreCount = sc.getConf.get("spark.executor.cores").toDouble

有没有人对此进行过研究,并能就参数化分区数量的好方法给出任何建议?

我意识到不会有一个好的答案,但一些带有常量的函数形式会有所帮助。例如:

val partitionCount = instanceCount*coreCount*0.7 

在我的用例中似乎很有效,并描述你的用例(执行者的数量/范围)会很有帮助。

在答案中,如果你能指出你所处理的实例的范围,那也会很有帮助。如果在某个地方有一个规范的调查,那么一个指向它的指针会很有帮助。

没有针对所有用例的单一优化配置,但我将向您介绍我在使用Spark的过程中收集到的所有启发式方法。

分区多于核心

首先,让我们陈述一下显而易见的情况。您需要拥有比核心更多的分区(=给定阶段中的任务),否则一些核心将无所事事。这个经验法则有例外吗?是:

  • 您还可以并行运行多个作业。假设您有1000个小数据集,并且需要独立于其他数据集对每个数据集应用一些转换。您可能不想将每个数据集划分为128k个文件,但是您可以并行运行128个分区的多个作业,以最大限度地增加核心数量。请注意,我只知道如何通过设置spark.scheduler.mode=FAIR在一个步骤内或在自定义托管的YARN集群上做到这一点。我从未尝试过提交并行的EMR步骤,我不知道这是否可能,这不是一个常规的YARN概念(但同样,如果你愿意,你可以在同一步骤中完成)
  • 您的任务本身就是并行的。这绝对不是一个常规的用例,我一般不建议使用,但我不得不在Spark上并行化一些MXNet分类代码。Java代码创建了一个Python进程,该进程使用MXNet进行预测,然后将结果返回给Java。由于MXNet是内部并行的,并且非常善于使用核心,我发现通过拥有尽可能多的机器(因此采用尽可能小的实例)和每台机器只有两个执行器(容器),吞吐量会高得多。每个执行器都创建了一个MXNet进程,用于服务4个Spark任务(分区),这足以最大限度地提高我的CPU使用率。在不限制MXNet进程数量的情况下,CPU始终固定在100%,并在上下文切换中浪费了大量时间

每个分区有大量数据

小分区会导致作业速度变慢,因为驱动程序和从属程序之间有一定的通信量,如果你有10万个小任务,这确实会花费很多时间。如果你的任务在1秒内完成,那么你的分区肯定太小了。

相反,大分区会危害内存,尤其是在洗牌期间。Shuffle对内存的要求很高,会让你进行大量的垃圾收集。如果您的分区太大,您将增加内存不足的风险,或者最多在GC中花费50%以上的时间。串行形式的2GB是分区大小的绝对限制,因为Spark使用由字节数组支持的Java IO实用程序,该数组只能容纳2^31 - 1(int的大小)元素。

一般来说,如果您正在进行shuffle(这里主要讨论JSON和文本数据),我建议使用gzipped形式的大约25MB-70MB。

广播

如果您需要将一些对象广播给所有执行器(例如,用于在对数据集进行混洗之前减小数据集大小的Bloom过滤器),则您想要的容器数量将取决于您愿意在每台机器上用于保存广播的内存量。事实上,每个执行器将广播一次对象,因此假设同构集群,则每台机器的广播数据量为object_size * num_executors / num_ec2_instances。网络成本也随着容器的数量而增加,因为对象需要多次广播到每个EC2实例。

然而,我遇到过这样的情况,我的广播对象是一个逻辑模型,它在分类过程中使用了一些内部可变状态。这意味着predict方法是同步的,并且我的容器中的所有线程都在努力访问这个锁。通过增加容器的数量(从而增加广播的内存和网络成本),我的工作速度提高了4倍。

摘要

分区的数量是由数据的大小决定的,而不是由可用内核的数量决定的。如果您的数据不需要超过200个分区,那么只需要不使用大于200个核心的集群,那么如果分区的大小已经相当合适,那么您可能不会从增加分区和核心的数量中获得任何有意义的速度。

只要你的数据大小合适,分区平衡,剩下的启发式方法只有:

  • 至少使用与分区一样多的内核
  • 如果您想增加吞吐量,但分区已经足够大,则并行运行多个作业(如果可以和)
  • 避免在任务中运行多线程代码,但在极少数需要的情况下,考虑使用比核心更少的分区,这与我们之前所说的相反
  • 容器越多,广播的成本就越高(广播期间的网络活动越多,内存使用率就越高,直到它被破坏)。如果您的广播对象是完全不可变的,请尝试使用尽可能少的容器。如果您的容器有一些内部状态并且需要锁定,那么每个容器有太多线程可能会增加争用并降低速度

最新更新