为什么在一个空的Spark集群上不强制执行preferredLocation



我的Spark作业由3个工人组成,他们与需要读取的数据位于同一位置。我提交了一个带有一些元数据的RDD,工作任务将这些元数据转换为实际数据。例如,元数据可以包含要从本地工作文件系统读取的文件,而spark作业的第一阶段是将该文件读取到RDD分区中。

在我的环境中,数据可能不存在于所有3个工人身上,而且跨工人读取数据的成本太高(即,如果数据在工人1身上,那么工人2就无法联系并获取数据(。出于这个原因,我必须将分区强制分配给它们正在读取的数据的适当工作者。我有一种实现这一点的机制,我根据元数据中的预期工作者来检查工作者,如果他们不匹配,就会用描述性错误消息使任务失败。使用黑名单,我可以确保在另一个节点上重新安排任务,直到找到合适的节点。这很好,但作为一种优化,我想使用preferredLocation来帮助最初将任务分配给合适的员工,而不必经历尝试/重新安排过程。

是否使用makeRDD创建我的(元数据的(初始RDD,并按照这里的答案使用正确的首选位置:如何控制RDD分区的首选位置?,然而,它并没有表现出我所期望的行为。生成RDD的代码如下:

sc.makeRDD(taskAssigments)

其中taskAssignments的形式为:

val taskAssignments = mutable.ArrayBuffer[(String, Seq[String])]()
metadataMappings.foreach { case(k , v) => {
taskAssignments += (k + ":" + v.mkString(",") -> Seq(idHostnameMappings(k)))
}}

idHostMappings只是id的映射->hostName和我已经验证它包含正确的信息。

假设我的测试Spark集群是完全干净的,没有其他作业在上面运行,并且输入RDD中没有偏斜(它有3个分区来匹配3个工作线程(,我希望任务被分配到他们的首选位置。相反,我仍然看到错误消息,指示任务正在经历失败/重新安排过程。

我认为任务将在干净集群上的首选位置进行调度的假设正确吗?我还能做些什么来强制执行吗?

跟进:

我还能够创建一个简单得多的测试用例。我的3个spark工人分别命名为worker1、worker2和worker3,我运行以下程序:

import scala.collection.mutable
val someData = mutable.ArrayBuffer[(String, Seq[String])]()
someData += ("1" -> Seq("worker1"))
someData += ("2" -> Seq("worker2"))
someData += ("3" -> Seq("worker3"))
val someRdd = sc.makeRDD(someData)
someRdd.map(i=>i + ":" + java.net.InetAddress.getLocalHost().getHostName()).collect().foreach(println)

我希望看到1:worker1等,但实际上看到了

1:worker3
2:worker1
3:worker2

有人能解释这种行为吗?

原来问题出在我的环境上,而不是Spark。为了防止其他人遇到这种情况,问题是Spark工作人员默认情况下没有使用机器主机名。在每个工作者上设置以下环境变量纠正了它:SPARK_LOCAL_HOSTNAME:";工人1";

最新更新