如何在python/pyspark中并行化for循环(可能会在亚马逊服务器上的多个节点上运行)?



抱歉,如果这是一个非常基本的问题,但我只是找不到我的查询的简单答案。

我有一些计算密集型代码,令人尴尬地并行化。伪代码如下所示。

n = 500
rounds = 200
data = [d_1, ..., d_n]
values = [ 0 for _ in range(n) ]
for _ in range(rounds):
for i in range(n): # Inner Loop
values[i] = compute_stuff(data[i])
data = special_function(values)

内部循环的每次迭代需要 30 秒,但它们是完全独立的。因此,我想通过将计算拆分到 Amazon 上运行的 500 个独立节点来并行运行n=500次迭代,将内部循环的运行时间缩短到 ~30 秒。我该怎么做?

我假设 PySpark 是用于此目的的标准框架,而 Amazon EMR 是相关服务,使我能够在多个节点上并行运行它。所以我的问题是:我应该如何使用 PySpark 框架在 Amazon 服务器上的 500 个并行节点上增加上述代码?或者,我应该使用不同的框架和/或亚马逊服务来完成此操作吗?

以下是有关伪代码的一些详细信息。每个数据输入d_i都是一个自定义对象,尽管它可以转换为(并从中恢复)2个数字数组,A并在必要时Bcompute_stuff的返回值(因此,values的每个条目)也是自定义对象。虽然,同样,这个自定义对象可以转换为(并从中恢复)数字列表字典。此外,compute_stuff需要使用PyTorch和NumPy。最后,special_function不是像加法这样简单的事情,所以我认为它不能真正用作香草地图归约的"减少"部分。

任何帮助不胜感激!

根据你的描述,我不会使用 pyspark。要使用pyspark处理数据,您必须完全重写代码(仅举几例:使用rdd,使用Spark函数而不是python函数)。 我认为(在你的情况下!)使用像美妙的pymp这样的东西要容易得多。您不必对代码进行太多修改:

#still pseudocode
import pymp
n = 500
rounds = 200
data = [d_1, ..., d_n]
values = pymp.shared.list()
for _ in range(rounds):
with pymp.Parallel(n) as p:
for i in p.range(n):
values.append(compute_stuff(data[i]))
data = special_function(values)

如果values列表的顺序很重要,您可以使用p.thread_num +i来计算独特的指数。 Pymp 允许您使用机器的所有内核。当您想使用多台 aws 机器时,您应该查看 slurm。

最新更新