如何在不依赖其他软件的情况下为具有多个工作线程的 Flask 应用提供共享状态?



我想为与多个工作线程(即多个进程(一起运行的 Flask 应用程序提供共享状态。

引用有关此主题的类似问题的答案:

不能使用全局变量来保存此类数据。[...]使用 Flask 外部的数据源来保存全局数据。数据库、memcached 或 redis 都是适当的单独存储区域,具体取决于您的需求。

(来源:全局变量线程在烧瓶中安全吗?如何在请求之间共享数据?

我的问题是关于如何在 Flask "外部"提供数据的建议的最后一部分。目前,我的 Web 应用程序非常小,我想避免对其他程序的要求或依赖。如果我不想在后台运行 Redis 或其他任何内容,而是使用 Web 应用程序的 Python 代码提供所有内容,我有什么选择?

如果 Web 服务器的工作线程类型与multiprocessing模块兼容,则可以使用multiprocessing.managers.BaseManager为 Python 对象提供共享状态。一个简单的包装器可能如下所示:

from multiprocessing import Lock
from multiprocessing.managers import AcquirerProxy, BaseManager, DictProxy
def get_shared_state(host, port, key):
shared_dict = {}
shared_lock = Lock()
manager = BaseManager((host, port), key)
manager.register("get_dict", lambda: shared_dict, DictProxy)
manager.register("get_lock", lambda: shared_lock, AcquirerProxy)
try:
manager.get_server()
manager.start()
except OSError:  # Address already in use
manager.connect()
return manager.get_dict(), manager.get_lock()

您可以将数据分配给shared_dict,使其可跨流程访问:

HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)
shared_dict["number"] = 0
shared_dict["text"] = "Hello World"
shared_dict["array"] = numpy.array([1, 2, 3])

但是,您应该注意以下情况:

  • 使用shared_lock防止在覆盖shared_dict中的值时出现争用条件。(参见下面的烧瓶示例。
  • 没有数据持久性。如果重新启动应用,或者如果主(第一个(BaseManager进程死亡,则共享状态将消失。
  • 通过这种简单的BaseManager实现,您无法直接编辑shared_dict中的嵌套值。例如,shared_dict["array"][1] = 0不起作用。您必须编辑副本,然后将其重新分配给字典键。

烧瓶示例:

以下 Flask 应用使用全局变量来存储计数器号:

from flask import Flask
app = Flask(__name__)
number = 0
@app.route("/")
def counter():
global number
number += 1
return str(number)

当仅使用 1 个工作器gunicorn -w 1 server:app时,这有效。使用多个工作进程gunicorn -w 4 server:app很明显,number不是共享状态,而是每个工作进程的单独状态。

相反,使用shared_dict,该应用程序如下所示:

from flask import Flask
app = Flask(__name__)
HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)
shared_dict["number"] = 0
@app.route("/")
def counter():
with shared_lock:
shared_dict["number"] += 1
return str(shared_dict["number"])

这适用于任意数量的工人,例如gunicorn -w 4 server:app.

你的例子对我来说有点神奇!我建议以Namespace的形式重用multiprocessing代码库中已有的魔法。 我试图使以下代码与spawn服务器(即 MS Windows(兼容,但我只能访问 Linux 机器,所以无法在那里进行测试

首先引入依赖项并定义我们的自定义Manager并注册一个方法来获取Namespace单例:

from multiprocessing.managers import BaseManager, Namespace, NamespaceProxy
class SharedState(BaseManager):
_shared_state = Namespace(number=0)
@classmethod
def _get_shared_state(cls):
return cls._shared_state
SharedState.register('state', SharedState._get_shared_state, NamespaceProxy)

如果创建初始状态的成本很高,则可能需要更复杂的操作,因此应仅在需要时进行。 请注意,如果 Gunicorn 稍后启动新的工作进程,例如在由于超时而杀死一个工作进程后,则在进程启动期间初始化状态的 OPs 版本将导致所有内容重置

接下来,我定义一个函数来访问此共享状态,类似于 OP 的工作方式:

def shared_state(address, authkey):
manager = SharedState(address, authkey)
try:
manager.get_server()  # raises if another server started
manager.start()
except OSError:
manager.connect()
return manager.state()

虽然我不确定我是否建议做这样的事情。 当gunicorn启动时,它会生成许多进程,这些进程都竞相运行此代码,如果有时会出错,我也不会感到惊讶。此外,如果它碰巧杀死了服务器进程(由于例如超时(,则所有其他进程将开始失败

也就是说,如果我们想使用它,我们会做这样的事情:

ss = shared_state('server.sock', b'noauth')
ss.number += 1

这使用 Unix 域套接字(传递字符串而不是元组作为地址(来锁定它。

另请注意,这与 OP 的代码具有相同的争用条件:增加一个数字将导致该值被传输到工作进程,然后递增,并发送回服务器。 我不确定_lock应该保护什么,但我认为它不会做太多事情

相关内容

  • 没有找到相关文章

最新更新