我正在尝试并行创建实例的属性,以了解有关多处理的更多信息。我的目标是避免以顺序的方式创建属性,假设它们彼此独立。我读到多处理创建了自己的空间,并且可以在进程之间建立连接。
我认为这种联系可以帮助我在工人中共享相同的对象,但我没有找到任何帖子可以展示实现这一点的方法。如果我尝试并行创建属性,那么当过程结束时,我无法在main上访问它们。有人能帮我吗?我需要做什么?
下面我提供了一个MRE,介绍我试图通过使用MPIRE包获得的内容。希望它能说明我的问题。
from mpire import WorkerPool
import os
class B:
def __init__(self):
pass
class A:
def __init__(self):
self.model = B()
def do_something(self, var):
if var == 'var1':
self.model.var1 = var
elif var == 'var2':
self.model.var2 = var
else:
print('other var.')
def do_something2(self, model, var):
if var == 'var1':
model.var1 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
elif var == 'var2':
model.var2 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
else:
print(f"Worker {os.getpid()} is processing do_something2({var})")
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B()
self.__sets_list = ['var1', 'var2', 'var3']
with WorkerPool(n_jobs=3, start_method='fork') as pool:
model = self.model
pool.set_shared_objects(model)
pool.map(self.do_something2,self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
def main(): # this main will be in another file that call different classes
obj = A()
obj.init_var()
if __name__ == '__main__':
main = main()
它生成以下输出:
python src/test_change_object.py其他变体。var1dict_keys(['var1'])辅助进程20040正在处理do_something2(var1)Worker 20041正在处理do_something2(var2)Worker 20042正在处理do_something2(var3)追踪(最近一次通话):文件"/mnt/c/git/bioactives/src/test_change_object.py",第59行,位于main=main()文件"/mnt/c/git/bioactives/src/test_change_object.py",第55行,在main中obj.init_var()文件"/mnt/c/git/bioactives/src/test_change_object.py",第49行,在init_var中打印(self.model.var1)AttributeError:"B"对象没有属性"var1">
我感谢您的帮助。Tkx
不使用mpire的解决方案能工作吗?您可以通过使用多处理原语来实现您所追求的,即共享复杂对象的状态。
TL;DR
此代码有效:
import os
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
class B:
def __init__(self):
pass
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
class A:
def __init__(self):
self.model = B.create()
def do_something(self, var):
if var == 'var1':
self.model.var1 = var
elif var == 'var2':
self.model.var2 = var
else:
print('other var.')
def do_something2(self, model, var):
if var == 'var1':
model.var1 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
elif var == 'var2':
model.var2 = var
print(f"Worker {os.getpid()} is processing do_something2({var})")
else:
print(f"Worker {os.getpid()} is processing do_something2({var})")
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B.create()
self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
with Pool(3) as pool:
# model = self.model
# pool.set_shared_objects(model)
pool.starmap(self.do_something2, self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
def main(): # this main will be in another file that call different classes
obj = A()
obj.init_var()
if __name__ == '__main__':
main = main()
更长、详细的解释
以下是我认为正在发生的事情。即使您将self.model
设置为工作人员之间的共享对象,您在工作人员中更改它的事实也会强制进行复制(即,共享对象不可写)。来自mpire中共享对象的文档:
对于start方法fork,这些共享对象被视为写时复制,这意味着只有在对它们进行更改后,它们才会被复制。否则,它们共享相同的内存地址
因此,它建议使用方法fork
的共享对象仅在您只读取对象的情况下有用。文档还提供了这样一个用例
如果您想让工作人员访问在多次复制时无法放入内存的大型数据集,这很方便。
对此持保留态度,不过,从那以后,我再也没有使用过mpire。希望对图书馆有更多经验的人能够提供进一步的澄清。
无论如何,继续前进,您可以使用多处理管理器来实现这一点。管理器允许您在进程和工作者之间共享复杂的对象(在此上下文中是B类的对象)。您还可以使用它们来共享嵌套的字典、列表等。它们通过生成一个服务器进程来实现这一点,共享对象实际存储在服务器进程中,并允许其他进程通过代理访问该对象(稍后会详细介绍),以及通过pickle/unpickle传递给服务器进程和从服务器进程传递的任何参数和返回值。顺便说一句,使用酸洗/去酸洗也会导致限制性结构。例如,在我们的上下文中,这意味着为类B创建的任何函数参数和实例变量都应该是可拾取的。
回来,我提到我们可以通过代理访问服务器进程。代理基本上只是模仿原始对象的属性和功能的包装器。大多数使用特定的dunder方法,如__setattr__
和__getattr__
,下面给出了一个例子(从这里开始):
class Proxy(object):
def __init__(self, original):
self.original = original
def __getattr__(self, attr):
return getattr(self.original, attr)
class MyObj(object):
def bar(self):
print 'bar'
obj = MyObj()
proxy = Proxy(obj)
proxy.bar() # 'bar'
obj.bar() # 'bar'
使用代理的一大优点是它们是可拾取的,这在处理共享对象时很重要。在后台,每当您通过manager创建共享对象时,它都会为您创建一个代理。然而,这个默认代理(称为AutoProxy
)并不共享对象的命名空间。这对我们来说不起作用,因为我们正在使用类B的命名空间,并且希望它也被共享。因此,我们通过继承多处理提供的另一个未记录的代理来创建自己的代理:NamespaceProxy
。顾名思义,这个方法确实共享名称空间,但相反,它不共享任何实例方法。这就是为什么我们需要创建我们自己的代理,这是两全其美的:
from multiprocessing.managers import NamespaceProxy
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
关于为什么这样做的更多信息。请记住,这些代理不共享私有或受保护的属性/功能(请勾选此问题)。
在我们实现了这一点之后,剩下的只是一些样板代码,默认情况下,它使用这个代理为特定的数据类型创建可共享的复杂对象。在我们的上下文中,这意味着B类的代码将变成这样:
from multiprocessing import Manager, Queue, Pool
from multiprocessing.managers import BaseManager
class B:
def __init__(self):
pass
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
在上面的代码中,create
函数是一个通用的类构造函数,它自动使用我们的新代理和管理器来共享对象。它可以用于任何类,而不仅仅是B。现在唯一剩下的就是在init_var
中将mpire池的用法更改为多处理池。请注意,我们如何使用B.create()
而不是简单地使用B()
来创建类B的对象!:
def init_var(self):
self.do_something('var1')
self.do_something('test')
print(self.model.var1)
print(vars(self.model).keys())
# Trying to create the attributes in parallel
print('')
self.model = B.create()
self.__sets_list = [(self.model, 'var1'), (self.model, 'var2'), (self.model, 'var3')]
with Pool(3) as pool:
# model = self.model
# pool.set_shared_objects(model)
pool.starmap(self.do_something2, self.__sets_list)
print(self.model.var1)
print(vars(self.model).keys())
注意:我只在不使用";叉子";方法,而是";"产卵";方法启动进程。更多信息点击这里