我的类嵌入了一个处理对象的方法。但是,当我使用多处理时,原始对象不会被修改。更一般地说,如何使用它们的方法实现对象的多处理?(我使用python 3.8(
这是我的代码:
from multiprocessing import Pool
class MyObject(object):
def __init__(self):
self.level=1
def process(self):
self.level=2
# and other many things that modify the object...
if __name__ == "__main__":
objects = [MyObject() for i in range(10)]
pool = Pool(3)
async_results = []
for o in objects:
async_results.append(pool.apply_async(o.process, [], {}))
pool.close()
for r in async_results:
r.get()
for o in objects:
print(o.level) # unfortunately, 1, not 2
Multipriorocessing序列化对象并将它们发送到其他进程。然后,它返回序列化的对象作为返回值。因此,这些远程进程无法修改您发送给它们的原始内存空间中的对象。
相反,取返回的对象async_results
并使用它们,或者在此处使用这些结果中的数据修改objects
。
您只需要创建;"可代理";,托管对象与调用multiprocessing.Manager()
时创建的SyncManager
实例返回的对象类似,例如托管dict
实例:
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing.pool import Pool
class MyObject(object):
def __init__(self):
self.level=1
def process(self):
self.level=2
# and other many things that modify the object...
def delegatee(self):
return self
# Must explicitly create a customized proxy if attributes in addition to methods will be accessed
# And that forces us to name each method, e.g. process:
class MyObjectProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__getattr__', '__setattr__', 'process', 'delegatee')
def process(self):
callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
return callmethod('process')
def delegatee(self):
callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
return callmethod('delegatee')
"""
or you can just use the following generic signature for each of your methods:
def process(self, *args, **kwds):
callmethod = NamespaceProxy.__getattribute__(self, '_callmethod')
return callmethod('process', args, kwds)
"""
class MyObjectManager(BaseManager):
pass
if __name__ == "__main__":
MyObjectManager.register('MyObject', MyObject, MyObjectProxy)
with MyObjectManager() as manager:
objects = [manager.MyObject() for i in range(10)]
pool = Pool(3)
async_results = []
for o in objects:
async_results.append(pool.apply_async(o.process, [], {}))
# or just:
#async_results.append(pool.apply_async(o.process))
pool.close()
for r in async_results:
r.get()
for o in objects:
print(o.level)
obj0 = objects[0]
print(type(obj0))
delegatee = obj0.delegatee()
print(type(delegatee))
print('delegatee level =', delegatee.level)
打印:
2
2
2
2
2
2
2
2
2
2
<class '__main__.MyObjectProxy'>
<class '__main__.MyObject'>
delegatee level = 2
但请注意,每个方法调用或属性访问都是通过代理进行的,或多或少相当于远程过程调用。
这里有另一个解决方案,可能更简单,以防只涉及属性:
from multiprocessing import Pool
class MyObject(object):
def __init__(self, id):
self.id = id
self.level = 1
def process(self):
self.level = 2 # modified attribute
self.name = "xxx" # new attribute
return self.__dict__
if __name__ == "__main__":
objects = [MyObject(i) for i in range(10)]
pool = Pool(3)
async_results = []
for o in objects:
async_results.append(pool.apply_async(o.process, [], {}))
pool.close()
results=[]
for r in async_results:
results.append(r.get())
for r in results:
for o in objects:
if o.id == r["id"]:
o.__dict__.update(r)
break
for o in objects:
print(o.__dict__)