为了在Windows(miniconda(上的交互式python中使用multiprocessing
,我找到了一个非常有用的代码,它运行良好。但是,代码不能将类中的self
参数传递给要池化的函数。这是我的代码,可以在Google colab上运行,但在Windows iPython上永远不会完成:
import multiprocessing
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def worker(d):
"""worker function"""
for i in range(10000):
j = i **(1/3) + d.bias
return j
class dummy():
def __init__(self):
self.bias = 1000
def calc(self):
pool = Pool(processes=12)
results = {}
for i in range(5):
results[i] = (pool.apply_async(*make_applicable(worker,self)))
pool.close()
pool.join()
print([results[i].get() for i in range(5)])
d=dummy()
d.calc()
如果我传递其他类型的变量,代码在 Windows 上运行良好,例如:
results[i] = (pool.apply_async(*make_applicable(worker,self.bias)))
但是当我self
传递给函数时,该过程永远不会完成。我不知道该怎么办。
从这里poolable.py
:
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
编辑:
似乎这个问题不仅存在于self
,还存在于传递给make_applicable
函数的任何其他类中。以下代码也不会完成:
class dummy2():
def __init__(self):
self.bias = 1000
class dummy():
def __init__(self):
self.bias = 1000
def copy(self):
return copy.deepcopy(self)
def calc(self):
pool = Pool(processes=12)
results = {}
for i in range(5):
d = dummy2()
results[i] = pool.apply_async(*make_applicable(worker,d))
pool.close()
pool.join()
print([results[i].get() for i in range(5)])
使用 IPython 控制台: 将代码放在模块(mp.py
(中,确保类实例化和方法调用在if __name__ == '__main__':
条件中执行
import multiprocessing
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def worker(d):
"""worker function"""
for i in range(10000):
j = i **(1/3) + d.bias
return j
class Dummy():
def __init__(self):
# self.bias = 1000
self.bias = 10
def calc(self):
pool = Pool(processes=12)
results = {}
for i in range(5):
results[i] = (pool.apply_async(*make_applicable(worker,self)))
pool.close()
pool.join()
return [results[i].get() for i in range(5)]
if __name__ == '__main__':
d=Dummy()
print(d.calc())
然后在控制台中运行模块:
In [1]: runfile('P:/pyProjects3/mp.py', wdir='P:/pyProjects3')
[31.543628731482663, 31.543628731482663, 31.543628731482663, 31.543628731482663, 31.543628731482663]
我有Jupyter Notebook(Anaconda(,但不知道如何使用它,如果我弄清楚了,我会更新这个答案。
尝试使用此代码进行测试
import pickle
def worker(d):
"""worker function"""
d = pickle.loads(d)
for i in range(10000):
j = i **(1/3) + d.bias
return j
class dummy():
def __init__(self):
self.bias = 1000
def calc(self):
pool = Pool(processes=12)
results = {}
for i in range(5):
x = pickle.dumps(self)
results[i] = (pool.apply_async(*make_applicable(worker,x)))
pool.close()
pool.join()
print([results[i].get() for i in range(5)])