我正在Amazon Elastic Map Reduce(EMR)上的Apache Spark上运行一个作业。目前我在emr-4.1.0上运行,它包括AmazonHadoop2.6.0和Spark 1.5.0。
当我启动作业时,YARN正确地将所有工作节点分配给了spark作业(当然,其中一个用于驱动程序)。
我将神奇的"maximizeResourceAllocation"属性设置为"true",而spark属性"spark.dynamicAllocation.enabled"也设置为"true"。
然而,如果我通过将节点添加到工作机器的CORE池来调整emr集群的大小,YARN只会将一些的新节点添加到spark作业中。
例如,今天早上我有一项工作,使用了26个节点(m3.2xlarge,如果重要的话)——1个用于驱动程序,25个执行程序。我想加快这项工作的速度,所以我尝试再添加8个节点。YARN已经获得了所有的新节点,但只将其中的一个分配给了Spark作业。Spark确实成功地获取了新节点,并将其用作执行器,但我的问题是,为什么YARN让其他7个节点闲置?
这很烦人,原因很明显——即使资源没有被使用,我也必须为它们付费,而且我的工作根本没有加快!
有人知道YARN是如何决定何时将节点添加到正在运行的spark作业中的吗?哪些变量起作用?记忆力V型芯?任何东西
提前感谢!
好吧,在@sean_r_owen的帮助下,我找到了这个。
问题是:当将spark.dynamicAllocation.enabled
设置为true
时,不应该设置spark.executor.instances
-它的显式值将覆盖动态分配并将其关闭。事实证明,如果您自己不设置,EMR会在后台设置它。要获得所需的行为,需要显式地将spark.executor.instances
设置为0。
对于记录,以下是我们在创建EMR集群时传递给--configurations
标志的一个文件的内容:
[
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true",
"spark.executor.instances": "0"
}
}
]
这为我们提供了一个EMR集群,Spark在运行作业时使用所有节点,包括添加的节点。它似乎还使用了所有/大部分内存和所有(?)核心。
(我不完全确定它是否使用了所有实际的核心;但它肯定使用了超过1个VCore,这是以前没有的,但根据Glennie Helles的建议,它现在表现更好,使用了一半列出的VCore,似乎等于实际的核心数量…)
我使用emr-5.20.0在几乎相同的设置下观察到了相同的行为。当集群已经在运行时,我没有尝试添加节点,而是使用TASK节点(只有一个CORE节点)。我使用InstanceFleets来定义MASTER、CORE和TASK节点(使用InstanceFleet,我不知道我得到的是哪种确切的InstanceTypes,这就是为什么我不想自己定义每个执行器的执行器、核心和内存的数量,但希望它能自动最大化/优化)。
这样,它只使用两个TASK节点(可能是准备好使用的前两个节点?),但在提供更多TASK节点并完成引导阶段时,它从未扩大规模。
在我的情况下,它之所以有效,是因为设置了spark.default.parallelism
参数(为我的TASK节点的核心总数),这与TASK InstanceFleet:的TargetOnDemandCapacity或TargetSpotCCapacity使用的数字相同
[
{
"Classification": "capacity-scheduler",
"Properties": {
"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true",
"spark.default.parallelism", <Sum_of_Cores_of_all_TASK_nodes>
}
}
]
为了完整性:我使用一个CORE节点和几个TASK节点,主要是为了确保集群至少有3个节点(1个MASTER、1个CORE和至少一个TASK节点)。在我尝试只使用CORE节点之前,但在我的情况下,核心的数量是根据实际任务计算的,最终可能会得到一个仅由一个MASTER和一个CORE节点组成的集群。使用maximizeResourceAllocation
选项,这样的集群将永远运行而不做任何事情,因为运行纱线应用程序主机的执行器完全占用了单个CORE节点。