我正在尝试并行化一个接受多个常量参数的函数。到目前为止,我已经能够运行它,但它没有并行化该过程。我应该如何处理它?
我尝试执行以下操作:
import numpy as np
import multiprocessing
def optm(hstep,astep,time_ERA):
#this is a secondary function where I get arrays from a dataset
data = checkdate(time_ERA,2,4)
zlevels=data[0]
pottemp=data[1]
for z1 in np.linspace(0,zlevels[-1],hstep):
for z2 in np.linspace(0,zlevels[-1],hstep):
for a1 in np.linspace(0,0.01,astep): # max angle
for a2 in np.linspace(0,0.01,astep):
for a3 in np.linspace(0,0.01,astep):
result_array=another_function(zlevels,pottemp,z1,z2,a1,a2,a3) # this function is the one that does all the math in the code. Therefore, it take a lot of time to compute it.
return result_array
然后我以这种方式并行化了函数:
input_list = [(hstep,astep,time_ERA)] #creat a tuple for the necessary startmap
pool = multiprocessing.Pool()
result = pool.starmap(optm, input_list)
pool.close()
当我运行它时,它比没有并行化需要更长的时间。这是我第一次尝试并行化代码,所以我仍然不确定我是否应该使用 map 或 starmap 以及如何并行化它。
使用我评论中适合您问题的最小示例:
import multiprocessing
import time
import numpy as np
def optm(hstep,astep,time_ERA):
values = []
#this is a secondary function where I get arrays from a dataset
data = checkdate(time_ERA,2,4)
zlevels=data[0]
pottemp=data[1]
for z1 in np.linspace(0,zlevels[-1],hstep):
for z2 in np.linspace(0,zlevels[-1],hstep):
for a1 in np.linspace(0,0.01,astep): # max angle
for a2 in np.linspace(0,0.01,astep):
for a3 in np.linspace(0,0.01,astep):
values.append([zlevels,pottemp,z1,z2,a1,a2,a3])
return values
def mp_worker((zlevels,pottempt,z1,z2,a1,a2,a3)):
temp = another_function(zlevels,pottemp,z1,z2,a1,a2,a3)
# do something with the result
def mp_handler(data):
p = multiprocessing.Pool(2) # Change 2 to your cpu count
p.map(mp_worker, data)
if __name__ == '__main__':
data = optm(hstep,astep,time_ERA)
mp_handler(data)
您可以对每组参数执行pool.apply_async()
,而不是映射,或者使用多处理队列将作业馈送到子进程。我假设输出需要存储在一个数组中,因此队列将使这变得容易得多。您可以将作业馈送到队列并将结果推送到另一个队列,当所有进程完成后,从主线程中的结果队列收集结果并将它们存储到数组中。