Spark只使用一台工作机器,而有更多可用机器



我正在尝试通过Spark并行化一个机器学习预测任务。我以前在其他任务中成功地使用过Spark很多次,以前没有遇到过并行化的问题。

在这个特定的任务中,我的集群有4个工作者。我在一个有4个分区的RDD上调用mapPartitions。map函数从磁盘加载一个模型(引导脚本分发执行此操作所需的所有内容;我已经验证了它在每个从属机器上存在),并对RDD分区中的数据点执行预测。

代码运行,但只使用一个执行器。其他执行器的日志显示"Shutdown hook called"。在不同的代码运行中,它使用不同的机器,但一次只能使用一台。

如何让Spark同时使用多台机器?

我通过齐柏林飞船笔记本电脑在亚马逊电子病历上使用PySpark。下面是代码片段。

%spark.pyspark
sc.addPyFile("/home/hadoop/MyClassifier.py")
sc.addPyFile("/home/hadoop/ModelLoader.py")
from ModelLoader import ModelLoader
from MyClassifier import MyClassifier
def load_models():
models_path = '/home/hadoop/models'
model_loader = ModelLoader(models_path)
models = model_loader.load_models()
return models
def process_file(file_contents, models):
filename = file_contents[0]
filetext = file_contents[1]
pred = MyClassifier.predict(filetext, models)
return (filename, pred)
def process_partition(file_list):
models = load_models()
for file_contents in file_list:
pred = process_file(file_contents, models)
yield pred

all_contents = sc.wholeTextFiles("s3://some-path", 4)
processed_pages = all_contents.mapPartitions(process_partition)
processedDF = processed_pages.toDF(["filename", "pred"])
processedDF.write.json("s3://some-other-path", mode='overwrite')

正如预期的那样,有四个任务,但它们都在同一个执行器上运行!

我已经运行了集群,可以在资源管理器中提供可用的日志。我只是还不知道去哪里找。

这里需要提到两点(但不确定它们是否能解决您的问题):

  1. wholeTextFiles使用扩展CombineFileInputFormatWholeTextFileInputFormat,由于CombineFileInputFormat,它将尝试将多组小文件组合到一个分区中。因此,例如,如果您将分区数设置为2,您"可能"会得到两个分区,但这并不能保证,这取决于您正在读取的文件的大小
  2. wholeTextFiles的输出是一个RDD,它在每个记录中都包含一个完整的文件(并且每个记录/文件不能拆分,因此它将以位于单个分区/工作进程中结束)。因此,如果您只读取一个文件,那么您将在一个分区中拥有完整的文件,尽管您在示例中将分区设置为4

进程有您指定的分区数量,但它是以序列化的方式进行的。

执行程序

该过程可能会增加默认的执行器数量。这可以在纱线资源管理器中看到。在您的案例中,所有处理都由一个执行器完成。如果执行器有多个核心,它将对作业进行并行处理。在emr中,您必须进行这些更改,以便为执行器提供一个以上的核心。

在我们的案例中,具体发生的是,数据很小,所以所有数据都在一个执行器中读取(即使用一个节点)。在没有以下属性的情况下,执行器仅使用单个核心。因此,所有任务都是序列化的。

设置属性

sudo  vi /etc/hadoop/conf/capacity-scheduler.xml

如所示设置以下属性

"yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalcul‌​ator"

为了使此属性适用,您必须重新启动纱线

sudo  hadoop-yarn-resourcemanager stop

重新启动纱线

sudo  hadoop-yarn-resourcemanager start 

当你的工作被提交时,看看纱线和火花ui

在Yarn中,您将看到更多执行器的核心

最新更新