我们有一个使用多个守护进程的应用程序,包括Django,Flask和Celery。
我们使用 Django rest 框架来获取来自客户端的请求,然后使用芹菜异步执行它们。这意味着我可能正在运行多个并发 python 进程,但我没有使用多处理或其他本机 python 并发处理。
尽管如此,我希望在不同的进程之间有一个共享锁。有没有办法在这种用例中使用python的Lock或Semaphore?
或者,我可以使用芹菜来处理它吗?
编辑:
对于我的用例,按照 @Matias Cicero 的建议,使用文件锁定就足够了。
但是,我选择了@Aaron答案作为接受,因为他提供了使用管理器,这解决了我的锁问题,以及提供了多个额外的共享资源。这实际上是一个非常有用的解决方案,可以满足我的需求。
谢谢大家!
您在上一条评论中描述的内容基本上正是您要做的。在启动同步管理器服务器并将函数注册到将返回该锁的服务器对象之前,应创建锁。然后,当您从其他进程连接到服务器时,您调用该注册函数以返回锁定对象(它实际上是锁的代理对象,但它的功能应该相同。这只会对可变容器(字典、列表等)产生进一步的影响,在这些容器中,对代理中的值的修改可能不会更新代理本身。
下面是创建服务器的一个简短示例,该服务器保存锁并将代理对象传递给请求它的客户端。
server.py:(首先在它自己的终端窗口中启动)
"""Multiprocessing sync manager server process"""
from multiprocessing import Lock
from multiprocessing.managers import BaseManager
class proxy_object_manager(BaseManager): pass
plock = Lock()
proxy_object_manager.register("get_lock", callable=lambda:plock)
#make sure this port is firewalled off from the rest of the world.
# arbitrary code execution is a very real possibility if this is insecure.
# be sure to read up on how this all works if you intend to expose this computer to the internet
m = proxy_object_manager(('',50000), b'password')
m.get_server().serve_forever()
client.py(在单独的终端中打开其中的两个或多个,然后按回车键进入其中一个,然后按另一个)
""" multiprocessing sync client example """
from multiprocessing.managers import BaseManager
# I played around a bit with making more functional subclasses
class proxy_object_client(BaseManager):
def __init__(self, proxy_getter, *args, **kwargs):
type(self).register(proxy_getter)
super().__init__(*args, **kwargs)
client = proxy_object_client("get_lock", ("127.0.0.1", 50000), b"password")
client.connect()
plock = client.get_lock()
print(plock)
#test lock functionality
print("aquiring lock")
plock.acquire()
print("got lock")
input("press enter to release lock")
plock.release()
print("lock released")
您应该看到 client.py 的第一个实例立即获取锁,但是第二个实例不应获取锁,直到您在第一个实例中释放它。
陷阱:
- 在我们的
client.py
中,plock 只公开它代理的对象的公共方法,所以私有方法(如处理"with"上下文的__enter__
和__exit__
)必须显式公开,或者通过使用子类化BaseProxy
指定代理类型来使其可用。
从 - python 3.3 开始,可以使用"with"上下文来启动服务器,但是如果您使用
manager.start()
,则无法使用 lambda 函数作为可调用对象,当您使用"with"时,该函数与manager.__enter__()
一起调用(这是因为它们不能被腌制(可能只是在 Windows 上)并发送到它创建的子进程) - 如果您要使用
manager.start()
或"with"上下文,还必须在 server.py 中使用if __name__ == "__main__":
习语来启动服务器,因为它们会启动一个子进程,该子进程将重新加载主模块,从而有效地造成分叉炸弹。幸运的是,多处理模块足够聪明,可以识别这一点并且不会这样做,但在这种情况下,它只会抛出异常。(通常,如果使用multiprocessing
,切勿在if __name__ ...
条件之外创建任何子进程。
下面是一个"server.py"示例,可以(在某种程度上)解决这些问题:
from multiprocessing import Lock, freeze_support
from multiprocessing.managers import BaseManager
class proxy_object_manager(BaseManager): pass
plock = Lock()
def lock_getter():
return plock
proxy_object_manager.register("get_lock", callable=lock_getter, exposed=['acquire', 'release', '__enter__', '__exit__'])
if __name__ == "__main__":
freeze_support() #needed on windows
with proxy_object_manager(('127.0.0.1',50000), b'password'):
input("press enter to stop server")