我想为与多个工作线程(即多个进程(一起运行的 Flask 应用程序提供共享状态。
引用有关此主题的类似问题的答案:
不能使用全局变量来保存此类数据。[...]使用 Flask 外部的数据源来保存全局数据。数据库、memcached 或 redis 都是适当的单独存储区域,具体取决于您的需求。
(来源:全局变量线程在烧瓶中安全吗?如何在请求之间共享数据?
我的问题是关于如何在 Flask "外部"提供数据的建议的最后一部分。目前,我的 Web 应用程序非常小,我想避免对其他程序的要求或依赖。如果我不想在后台运行 Redis 或其他任何内容,而是使用 Web 应用程序的 Python 代码提供所有内容,我有什么选择?
如果 Web 服务器的工作线程类型与multiprocessing
模块兼容,则可以使用multiprocessing.managers.BaseManager
为 Python 对象提供共享状态。一个简单的包装器可能如下所示:
from multiprocessing import Lock
from multiprocessing.managers import AcquirerProxy, BaseManager, DictProxy
def get_shared_state(host, port, key):
shared_dict = {}
shared_lock = Lock()
manager = BaseManager((host, port), key)
manager.register("get_dict", lambda: shared_dict, DictProxy)
manager.register("get_lock", lambda: shared_lock, AcquirerProxy)
try:
manager.get_server()
manager.start()
except OSError: # Address already in use
manager.connect()
return manager.get_dict(), manager.get_lock()
您可以将数据分配给shared_dict
,使其可跨流程访问:
HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)
shared_dict["number"] = 0
shared_dict["text"] = "Hello World"
shared_dict["array"] = numpy.array([1, 2, 3])
但是,您应该注意以下情况:
- 使用
shared_lock
防止在覆盖shared_dict
中的值时出现争用条件。(参见下面的烧瓶示例。 - 没有数据持久性。如果重新启动应用,或者如果主(第一个(
BaseManager
进程死亡,则共享状态将消失。 - 通过这种简单的
BaseManager
实现,您无法直接编辑shared_dict
中的嵌套值。例如,shared_dict["array"][1] = 0
不起作用。您必须编辑副本,然后将其重新分配给字典键。
烧瓶示例:
以下 Flask 应用使用全局变量来存储计数器号:
from flask import Flask
app = Flask(__name__)
number = 0
@app.route("/")
def counter():
global number
number += 1
return str(number)
当仅使用 1 个工作器gunicorn -w 1 server:app
时,这有效。使用多个工作进程gunicorn -w 4 server:app
很明显,number
不是共享状态,而是每个工作进程的单独状态。
相反,使用shared_dict
,该应用程序如下所示:
from flask import Flask
app = Flask(__name__)
HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)
shared_dict["number"] = 0
@app.route("/")
def counter():
with shared_lock:
shared_dict["number"] += 1
return str(shared_dict["number"])
这适用于任意数量的工人,例如gunicorn -w 4 server:app
.
你的例子对我来说有点神奇!我建议以Namespace
的形式重用multiprocessing
代码库中已有的魔法。 我试图使以下代码与spawn
服务器(即 MS Windows(兼容,但我只能访问 Linux 机器,所以无法在那里进行测试
首先引入依赖项并定义我们的自定义Manager
并注册一个方法来获取Namespace
单例:
from multiprocessing.managers import BaseManager, Namespace, NamespaceProxy
class SharedState(BaseManager):
_shared_state = Namespace(number=0)
@classmethod
def _get_shared_state(cls):
return cls._shared_state
SharedState.register('state', SharedState._get_shared_state, NamespaceProxy)
如果创建初始状态的成本很高,则可能需要更复杂的操作,因此应仅在需要时进行。 请注意,如果 Gunicorn 稍后启动新的工作进程,例如在由于超时而杀死一个工作进程后,则在进程启动期间初始化状态的 OPs 版本将导致所有内容重置
接下来,我定义一个函数来访问此共享状态,类似于 OP 的工作方式:
def shared_state(address, authkey):
manager = SharedState(address, authkey)
try:
manager.get_server() # raises if another server started
manager.start()
except OSError:
manager.connect()
return manager.state()
虽然我不确定我是否建议做这样的事情。 当gunicorn
启动时,它会生成许多进程,这些进程都竞相运行此代码,如果有时会出错,我也不会感到惊讶。此外,如果它碰巧杀死了服务器进程(由于例如超时(,则所有其他进程将开始失败
也就是说,如果我们想使用它,我们会做这样的事情:
ss = shared_state('server.sock', b'noauth')
ss.number += 1
这使用 Unix 域套接字(传递字符串而不是元组作为地址(来锁定它。
另请注意,这与 OP 的代码具有相同的争用条件:增加一个数字将导致该值被传输到工作进程,然后递增,并发送回服务器。 我不确定_lock
应该保护什么,但我认为它不会做太多事情