下面的代码是一个有5件商品的商店,三个顾客每人要求一件商品。
import multiprocessing as mp
class Shop:
def __init__(self, stock=5):
self.stock = stock
def get_item(self, l, x):
l.acquire()
if self.stock >= x:
self.stock -= x
print(f"{self.stock} = remaining")
l.release()
if __name__ == "__main__":
l = mp.Lock()
obj = Shop()
p1 = mp.Process(target=obj.get_item, args=(l, 1))
p2 = mp.Process(target=obj.get_item, args=(l, 1))
p3 = mp.Process(target=obj.get_item, args=(l, 1))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("Final: ", obj.stock)
我得到的输出如下
4 = remaining
4 = remaining
4 = remaining
Final: 5
然而,由于我使用的是Lock
,我希望它是
4 = remaining
3 = remaining
2 = remaining
Final: 2
问题:如何实现上述输出只是锁(没有进程通信即没有管道/队列)?
这段代码没有像您期望的那样工作的原因是多进程没有与子进程共享它的状态。这意味着您启动的每个进程,p1
,p2
和p3
,都获得Shop
类对象的副本。它不是同一个物体。有两个解决这个问题的方法是,与进程共享实例属性stock
,或者共享整个对象本身。如果商店对象保存需要在进程之间共享的其他数据,则第二种方法可能更适合您的大型用例。
方法1:
要只共享stock
实例变量的值,可以使用multiprocessing.Value。使用this创建共享整数并访问其值的方法如下:
shared_int = multiprocessing.Value('i', 5)
print(f'Value is {shared_int.value}') # 5
适应您的用例,代码将变成:
import multiprocessing
class Shop:
def __init__(self, stock=5):
self.stock = multiprocessing.Value('i', stock)
def get_item(self, l, x):
l.acquire()
if self.stock.value >= x:
self.stock.value -= x
print(f"{self.stock.value} = remaining")
l.release()
if __name__ == "__main__":
l = multiprocessing.Lock()
obj = Shop()
p1 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p2 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p3 = multiprocessing.Process(target=obj.get_item, args=(l, 1))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("Final: ", obj.stock.value)
4 = remaining
3 = remaining
2 = remaining
Final: 2
方法2
共享整个复杂对象是一个更复杂的过程。我最近详细回答了一个关于共享复杂对象的类似问题(比如本例中的Shop类的对象),它还涵盖了下面提供的代码背后的推理。我建议您阅读它,因为它更详细地解释了底部提供的代码背后的逻辑。这个用例的唯一主要区别是,您将希望使用multiprocess (multiprocessing的一个分支)而不是multiprocessing。这个库的工作原理与内置的多处理相同,除了它提供了我们需要的更好的pickle支持。
基本上,您将希望使用多处理。管理器共享状态,并使用合适的代理访问状态。下面代码中提供的ObjProxy
就是这样一个代理,它共享名称空间和实例方法(除了受保护/私有属性)。有了这些之后,您只需要使用管理器和代理创建类Shop
的对象。这是使用Shop
类的新添加的create
方法完成的。这是一个类构造函数,Shop
的所有对象都应该使用这个方法创建,而不是直接调用构造函数。完整代码:
import multiprocess
from multiprocess import Manager, Process
from multiprocess.managers import NamespaceProxy, BaseManager
import types
class ObjProxy(NamespaceProxy):
"""Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
pickable and can its state can be shared among different processes. """
def __getattr__(self, name):
result = super().__getattr__(name)
if isinstance(result, types.MethodType):
def wrapper(*args, **kwargs):
return self._callmethod(name, args, kwargs)
return wrapper
return result
class Shop:
def __init__(self, stock=5):
self.stock = stock
@classmethod
def create(cls, *args, **kwargs):
# Register class
class_str = cls.__name__
BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
# Start a manager process
manager = BaseManager()
manager.start()
# Create and return this proxy instance. Using this proxy allows sharing of state between processes.
inst = eval("manager.{}(*args, **kwargs)".format(class_str))
return inst
def get_item(self, l, x):
with l:
if self.stock >= x:
self.stock -= x
print(f"{self.stock} = remaining")
def k(self, l, n):
pass
if __name__ == "__main__":
manager = Manager()
l = manager.Lock()
obj = Shop.create()
p1 = Process(target=obj.get_item, args=(l, 1, ))
p2 = Process(target=obj.get_item, args=(l, 1, ))
p3 = Process(target=obj.get_item, args=(l, 1, ))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print("Final: ", obj.stock)
4 = remaining
3 = remaining
2 = remaining
Final: 2
注意说明:
manager = Manager()
l = manager.Lock()
在前面的示例中,我们不需要为锁创建管理器(以及随后的代理)的原因概述在这里。上面的代码在没有创建代理的情况下不能工作的原因是我们不再在主进程中创建进程,并且锁不存在于当前进程的内存空间中(因为为我们的复杂对象创建一个管理器来共享它的状态会生成它自己的服务器进程)