使用PySpark而不是多处理模块-如何翻译我的代码



我目前正在使用多处理模块来并行化迭代,如本例所示。问题是,通过这种方式,我将只使用一个worker及其核心,但不使用所有可用的worker。此外,我无法并行化实验(我正在运行几个实验,每个实验都要进行多次迭代(。

这段代码的运行时间太长了,我知道使用PySpark可以大大减少运行时间。我的Spark知识很少,我不知道如何翻译这些代码才能将其用于Spark。这里使用的所有函数和类都是使用纯python(numpy和pandas(编写的

import concurrent.futures
import multiprocessing as mp
def process_simulation(experiment):
number_of_workers = mp.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers=number_of_workers) as executor:
results = list(executor.map(Simulation.simulation_steps(), iterations_generator()))
experiment.simulations = []
for i, v in enumerate(results):
experiment.simulations.append(results[v])

对于上下文,Experiment和Simulation是类(没有继承(。一个实验需要多次模拟才能完成。

谢谢!

您可以使用Fugue将这种类型的逻辑带到PySpark中,并使用最小的包装器。唯一需要做的就是从输入的DataFrame开始,然后你可以做一些类似的事情:

from fugue import transform
transform(input_df, Simulation.simulation_steps, schema=<your output schema here>, partition={"how": "per_row"}, engine="spark"

如果我对逻辑有更多的细节,我总是可以帮助更多的人把它做成这种形状。它可能只需要一个包装器函数。(个人简历中的联系信息(。

最新更新