为什么python多处理脚本过了一段时间就会变慢



我读到一个老问题,为什么这个python多处理脚本过了一段时间就会变慢?以及许多其他人在发布这篇文章之前。他们没有回答我的问题。

剧本的构思。该脚本在串行循环中生成256x256个数组。从包含具有相关参数的字典的列表中逐个计算数组的元素,每个数组元素一个字典(每个列表总共256x256个)。列表是我启用并行计算的方法。

问题。一开始,数据的生成速度从十几秒加快到几秒。然后,在几次迭代后,它开始放慢几分之一秒的速度,每个新数组都会生成到计算任何东西所需的时间。

其他信息。

  1. 我使用的是pool.map函数。在进行了一些小的更改以确定要计算的元素之后,我还尝试使用map_async。不幸的是,它比较慢,因为每次计算完数组时都需要初始化池
  2. 当使用pool.map时,我会在任何事情开始之前初始化池一次。通过这种方式,与map_async相比,我希望节省初始化池的时间
  3. CPU使用率较低,高达约18%
  4. 在我的例子中,硬盘驱动器不是瓶颈。计算所需的所有数据都在RAM中。我也不会把数据保存到硬盘上,把所有的东西都放在RAM里
  5. 我还检查了如果我使用不同数量的内核,2-24,问题是否仍然存在。也没有变化
  6. 我通过运行和终止一个池进行了一些额外的测试,a.每次生成一个数组,b.每10个数组。我注意到,与前一个池的执行时间相比,在每种情况下,代码的执行速度都会减慢,即如果前一个的执行速度减慢到5s,那么另一个将是5.X,依此类推。唯一一次执行速度不会减慢的时间是我串行运行代码时
  7. 工作环境:Windows 10,Python 3.7,conda 4.8.2,Spyder 4

问题:在只有CPU&是否涉及RAM(没有硬盘减速)?知道吗


更新代码:

import multiprocessing as mp 
from tqdm import tqdm
import numpy as np
import random
def wrapper_(arg):
return tmp.generate_array_elements(
self=arg['self'], 
nu1=arg['nu1'], 
nu2=arg['nu2'], 
innt=arg['innt'], 
nu1exp=arg['nu1exp'], 
nu2exp=arg['nu2exp'], 
ii=arg['ii'], 
jj=arg['jj'],
llp=arg['self'].llp, 
rr=arg['self'].rr, 
)
class tmp:
def __init__(self, multiprocessing, length, n_of_arrays):
self.multiprocessing = multiprocessing
self.inshape = (length,length)
self.length = length
self.ll_len = n_of_arrays
self.num_cpus = 8
self.maxtasksperchild = 10000
self.rr = 0

"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_ll(self, lp): 
return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]

"""original function is different, modified to return something"""
def get_ip(self): return np.random.random()

"""original function is different, modified to return something"""
def get_op(self): return np.random.random(self.length)

"""original function is different, modified to return something"""    
def get_innt(self, nu1, nu2, ip):
return nu1*nu2/ip

"""original function is different, modified to return something"""    
def __get_pp(self, nu1):
return np.exp(nu1)

"""dummy function for the example purpose"""
def dummy_function(self):
"""do important stuff"""
return 

"""dummy function for the example purpose"""
def dummy_function_2(self, result):
"""do important stuff"""
return np.reshape(result, np.inshape)

"""dummy function for the example purpose"""
def dummy_function_3(self):
"""do important stuff"""
return

"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_llp(self, ll, lp):
return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]

"""NOTE, lp is not used here for the example purpose but
in the original code, it's very important variable containg
relevant data for calculations"""
def generate(self, lp={}):
"""create a list that is used to the creation of 2-D array"""
"""providing here a dummy pp param to get_ll"""
ll = self.get_ll(lp)
ip = self.get_ip()

self.op = self.get_op()

"""length of args_tmp = self.length * self.length = 256 * 256"""
args_tmp = [
{'self': self, 
'nu1': nu1,  
'nu2': nu2, 
'ii': ii, 
'jj': jj,
'innt': np.abs(self.get_innt(nu1, nu2, ip)),
'nu1exp': np.exp(1j*nu1*ip),
'nu2exp': np.exp(1j*nu2*ip),
} for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]

pool = {}
if self.multiprocessing: 
pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)

"""number of arrays is equal to len of ll, here 300"""
for ll_ in tqdm(ll):
"""Generate data"""
self.__generate(ll_, lp, pool, args_tmp)

"""Create a pool of CPU threads"""
if self.multiprocessing: 
pool.terminate()
def __generate(self, ll, lp, pool = {}, args_tmp = []):
"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function()

self.llp = self.get_llp(ll, lp)
"""originally the values is taken from lp"""
self.rr = self.rr

if self.multiprocessing and pool: 
result = pool.map(wrapper_, args_tmp)
else: 
result = [wrapper_(arg) for arg in args_tmp]

"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
result = self.dummy_function_2(result)

"""original function is different"""
def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
if rr == 1 and self.inshape[0] - 1 - jj < ii: 
return 0
elif rr == -1 and ii > jj: 
return 0
elif rr == 0:
"""do nothing"""

ll1 = []
ll2 = []

"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function_3()

for kk, ll in enumerate(llp):
ll1.append(
self.__get_pp(nu1) * 
nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
)
ll2.append(
self.__get_pp(nu2) * 
nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
)

t1 = sum(ll1)
t2 = sum(ll2)
result = innt*np.abs(t1 - t2)
return result



g = tmp(False, 256, 300)
g.generate()

很难判断算法中发生了什么。我对多处理不太了解,但坚持使用函数并避免将自身传递到池进程中可能更安全。当您在pool.map()中传递args_tmpwrapper_时,就会执行此操作。此外,总体而言,尽量减少在父进程和子进程之间传递的数据量。我试图将lp列表的生成转移到池工作线程中,以防止传递过多的数据。

最后,尽管我认为这在本示例代码中并不重要,但您应该在使用pool后进行清理,或者使用带有with的pool。

我重写了你的一些代码来尝试,这看起来更快,但我不是100%,它符合你的算法。有些变量名称很难区分。

这对我来说要快得多,但很难说它是否准确地生成了您的解决方案。如果这是准确的,我的最后结论是,额外的数据传递显著减慢了池工作人员的速度。

#main.py
if __name__ == '__main__':
import os
import sys
file_dir = os.path.dirname(__file__)
sys.path.append(file_dir)
from tmp import generate_1
parallel = True
generate_1(parallel)

#tmp.py
import multiprocessing as mp 
import numpy as np
import random
from tqdm import tqdm
from itertools import starmap
def wrapper_(arg):
return arg['self'].generate_array_elements(
nu1=arg['nu1'],
nu2=arg['nu2'],
ii=arg['ii'],
jj=arg['jj'],
lp=arg['self'].lp,
nu1exp=arg['nu1exp'],
nu2exp=arg['nu2exp'],
innt=arg['innt']
)
def generate_1(parallel):
"""create a list that is used to the creation of 2-D array"""
il = np.random.random(256)
"""generating params for parallel data generation"""
"""some params are also calculated here to speed up the calculation process
because they are always the same so they can be calculated just once"""
"""this code creates a list of 256*256 elements"""
args_tmp = [
{
'nu1': nu1,  
'nu2': nu2, 
'ii': ii, 
'jj': jj,
'innt': np.random.random()*nu1+np.random.random()*nu2,
'nu1exp': np.exp(1j*nu1),
'nu2exp': np.exp(1j*nu2),
} for ii, nu1 in enumerate(il) for jj, nu2 in enumerate(il)]
"""init pool"""

"""get list of arrays to generate"""
ip_list = [random.sample((range(256)),int(np.random.random()*12)+1) for ii in range(300)]
map_args = [(idx, ip, args_tmp) for idx, ip in enumerate(ip_list)]
"""separate function to do other important things"""
if parallel:
with mp.Pool(8, maxtasksperchild=10000) as pool:
result = pool.starmap(start_generate_2, map_args)
else:
result = starmap(start_generate_2, map_args)
# Wrap iterator in list call.
return list(result)
def start_generate_2(idx, ip, args_tmp):
print ('starting {idx}'.format(idx=idx))
runner = Runner()
result = runner.generate_2(ip, args_tmp)
print ('finished {idx}'.format(idx=idx))
return result
class Runner():
def generate_2(self, ip, args_tmp):
"""NOTE, the method is much more extensive and uses other methods of the class""" 
"""so it must remain a method of the class that is not static!"""
self.lp = [{'a': np.random.random(), 'b': np.random.random()} for ii in ip]
"""this part creates 1-D array of the length of args_tmp, that's 256*256"""
result = map(wrapper_, [dict(args, self=self) for args in args_tmp])
"""it's then reshaped to 2-D array"""
result = np.reshape(list(result), (256,256))
return result

def generate_array_elements(self, nu1, nu2, ii, jj, lp, nu1exp, nu2exp, innt):
"""doing heavy calc"""
""""here is something else"""
if ii > jj: return 0

ll1 = []
ll2 = []
for kk, ll in enumerate(lp):
ll1.append(nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random()))
ll2.append(nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random()))

t1 = sum(ll1)
t2 = sum(ll2)
result = innt*np.abs(t1 - t2)
return result

我添加了一个通用模板来展示一种架构,在该架构中,您可以将共享参数的准备工作从任务运行器中分离出来,并且仍然使用类。这里的策略是不要创建太多任务(300个任务似乎比试图将它们拆分为64000个任务更快),也不要向每个任务传递太多数据。launch_task的接口应该尽可能简单,在我重构代码时,这相当于start_generate_2

import multiprocessing
from itertools import starmap

class Launcher():
def __init__(self, parallel):
self.parallel = parallel
def generate_shared_args(self):
return [(i, j) for i, j in enumerate(range(300))]
def launch(self):
shared_args = self.generate_shared_args()
if self.parallel:
with multiprocessing.Pool(8) as pool:
result = pool.starmap(launch_task, shared_args)
else:
result = starmap(launch_task, shared_args)
# Wrap in list to resolve iterable.
return list(result)

def launch_task(i, j):
task = Task(i, j)
return task.run()

class Task():
def __init__(self, i, j):
self.i = i
self.j = j
def run(self):
return self.i + self.j

if __name__ == '__main__':
parallel = True
launcher = Launcher(parallel)
print(launcher.launch())

此处的池文档中有一条关于清理池的警告:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

第一项讨论了避免共享状态,特别是避免大量数据。https://docs.python.org/3/library/multiprocessing.html#programming-指南

Ian Wilson的建议非常有帮助,其中一个建议有助于解决问题。这就是为什么他的答案被标记为正确的。

正如他所建议的那样,最好在较少的任务上调用池。因此,我不再为每个数组(N)调用pool.map,而是为每个数组的元素创建256*256次(总共N*256*256个任务),现在我只对计算整个数组的函数调用pool.max N次。函数内部的数组计算是以串行方式进行的。

我仍然将self作为参数发送,因为函数中需要它,但它对性能没有任何影响。

这个微小的变化加快了数组的计算速度,从7-15秒提高到1.5it/s-2s/it!

当前代码:

import multiprocessing as mp 
import tqdm
import numpy as np
import random
def wrapper_(arg):
return tmp.generate_array_elements(
self=arg['self'], 
nu1=arg['nu1'], 
nu2=arg['nu2'], 
innt=arg['innt'], 
nu1exp=arg['nu1exp'], 
nu2exp=arg['nu2exp'], 
ii=arg['ii'], 
jj=arg['jj'],
llp=arg['self'].llp, 
rr=arg['self'].rr, 
)
"""NEW WRAPPER HERE"""
"""Sending self doesn't have bad impact on the performance, at least I don't complain :)"""
def generate(arg):
tmp._tmp__generate(arg['self'], arg['ll'], arg['lp'], arg['pool'], arg['args_tmp'])
class tmp:
def __init__(self, multiprocessing, length, n_of_arrays):
self.multiprocessing = multiprocessing
self.inshape = (length,length)
self.length = length
self.ll_len = n_of_arrays
self.num_cpus = 8
self.maxtasksperchild = 10000
self.rr = 0

"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_ll(self, lp): 
return [random.sample((range(self.length)),int(np.random.random()*12)+1) for ii in range(self.ll_len)]

"""original function is different, modified to return something"""
def get_ip(self): return np.random.random()

"""original function is different, modified to return something"""
def get_op(self): return np.random.random(self.length)

"""original function is different, modified to return something"""    
def get_innt(self, nu1, nu2, ip):
return nu1*nu2/ip

"""original function is different, modified to return something"""    
def __get_pp(self, nu1):
return np.exp(nu1)

"""dummy function for the example purpose"""
def dummy_function(self):
"""do important stuff"""
return 

"""dummy function for the example purpose"""
def dummy_function_2(self, result):
"""do important stuff"""
return np.reshape(result, np.inshape)

"""dummy function for the example purpose"""
def dummy_function_3(self):
"""do important stuff"""
return

"""original function is different, modified to return something"""
"""for the example purpose, lp is not relevant here but in general is"""
def get_llp(self, ll, lp):
return [{'a': np.random.random(), 'b': np.random.random()} for ii in ll]

"""NOTE, lp is not used here for the example purpose but
in the original code, it's very important variable containg
relevant data for calculations"""
def generate(self, lp={}):
"""create a list that is used to the creation of 2-D array"""
"""providing here a dummy pp param to get_ll"""
ll = self.get_ll(lp)
ip = self.get_ip()

self.op = self.get_op()

"""length of args_tmp = self.length * self.length = 256 * 256"""
args_tmp = [
{'self': self, 
'nu1': nu1,  
'nu2': nu2, 
'ii': ii, 
'jj': jj,
'innt': np.abs(self.get_innt(nu1, nu2, ip)),
'nu1exp': np.exp(1j*nu1*ip),
'nu2exp': np.exp(1j*nu2*ip),
} for ii, nu1 in enumerate(self.op) for jj, nu2 in enumerate(self.op)]

pool = {}

"""MAJOR CHANGE IN THIS PART AND BELOW"""
map_args = [{'self': self, 'idx': (idx, len(ll)), 'll': ll, 'lp': lp, 'pool': pool, 'args_tmp': args_tmp} for idx, ll in enumerate(ll)]
if self.multiprocessing: 
pool = mp.Pool(self.num_cpus, maxtasksperchild=self.maxtasksperchild)

for _ in tqdm.tqdm(pool.imap_unordered(generate_js_, map_args), total=len(map_args)):
pass
pool.close()
pool.join()
pbar.close()
else:
for map_arg in tqdm.tqdm(map_args):
generate_js_(map_arg)
def __generate(self, ll, lp, pool = {}, args_tmp = []):
"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function()

self.llp = self.get_llp(ll, lp)
"""originally the values is taken from lp"""
self.rr = self.rr

"""REMOVED PARALLEL CALL HERE"""
result = [wrapper_(arg) for arg in args_tmp]

"""In the original code there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
result = self.dummy_function_2(result)

"""original function is different"""
def generate_array_elements(self, nu1, nu2, llp, innt, nu1exp, nu2exp, ii = 0, jj = 0, rr=0):
if rr == 1 and self.inshape[0] - 1 - jj < ii: 
return 0
elif rr == -1 and ii > jj: 
return 0
elif rr == 0:
"""do nothing"""

ll1 = []
ll2 = []

"""In the original code, there are plenty other things done in the code
using class' methods, they are not shown here for the example purpose"""
self.dummy_function_3()

for kk, ll in enumerate(llp):
ll1.append(
self.__get_pp(nu1) * 
nu1*nu2*nu1exp**ll['a']*np.exp(1j*np.random.random())
)
ll2.append(
self.__get_pp(nu2) * 
nu1*nu2*nu2exp**ll['b']*np.exp(1j*np.random.random())
)

t1 = sum(ll1)
t2 = sum(ll2)
result = innt*np.abs(t1 - t2)
return result



g = tmp(False, 256, 300)
g.generate()

再次感谢伊恩。

最新更新