在 pyspark 中并行运行多个函数调用



我有一个函数,我在pyspark-shell中运行

import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp

我想在 x 列表上并行运行它。 我试过这个——

x_list=[14,63]
from multiprocessing import Process
for x in x_list:
p = Process(target = compute, args = (x,))
p.start()

这样就完成了脚本。我希望它们在脚本完成之前完全运行。

我怎样才能做到这一点?

你必须掌握你启动的每个进程,并join()它们:

from multiprocessing import Process
import pandas as pd
def compute(x):
data = pd.read_csv("/tmp/data_{}.csv".format(x))
# Some Spark processing
# Writes back final output in tmp
x_list = [14,63]
processes = []
for x in x_list:
p = Process(target=compute, args=(x,))
processes.append(p)
p.start()
for p in processes:
p.join()

相关内容

  • 没有找到相关文章