正在读取全球收藏.从烧瓶请求安全的deque



我有一个烧瓶应用程序,该应用程序应该在指定的路由上向用户显示长期运行函数的结果。结果将每小时左右更改。为了避免用户必须等待结果,我想将其缓存在应用程序中的某个地方,并在后台以特定的间隔(例如每小时)重新计算它,以便没有用户请求必须等待长期运行的计算功能。

我想出的解决这是如下所示,但是,我不确定这是否真的是在具有多线程甚至多个过程的WebServer(例如waitress)的生产环境中进行的。, eventletgunicorn或什么不。

要重新计算后台结果,我使用apscheduler库中的BackgroundScheduler

然后将结果在集合中左二次。Deque对象,该对象被注册为模块范围的变量(据我所知,没有更好的可能性将应用程序宽的全球范围保存在烧瓶应用程序中?!)。由于将Deque的最大尺寸设置为2,因此随着新的结果,旧结果将在Deque的右侧弹出。

现在,烧瓶视图将deque[0]返回到请求者,这应该始终是最新结果。我决定在Queue上使用deque,因为后者没有内置的可能不删除第一项的可能性。

因此,可以保证,没有用户必须等待结果,因为旧的仅在新的时就从"缓存"中消失了。

有关此的最小示例,请参见下文。在运行脚本并击中http://localhost:5000时,可以看到"工作中的缓存" - "工作完成"不应迟到10秒,再加上很短的时间以重新计算"当前时间",仍然不必必须等待工作函数的time.sleep(5)秒直到请求返回。

这是给定要求的有效实现,它也将在适用于生产的WSGI服务器设置中工作,还是应该以不同的方式完成?

from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import time
import datetime
from collections import deque
# a global deque that is filled by APScheduler and read by a Flask view
deque = deque(maxlen=2)
# a function filling the deque that is executed in regular intervals by APScheduler
def some_long_running_job():
    print('complicated long running job started...')
    time.sleep(5)
    job_finished_at = datetime.datetime.now()
    deque.appendleft(job_finished_at)
# a function setting up the scheduler
def start_scheduler():
    scheduler = BackgroundScheduler()
    scheduler.add_job(some_long_running_job,
                      trigger='interval',
                      seconds=10,
                      next_run_time=datetime.datetime.utcnow(),
                      id='1',
                      name='Some Job name'
                      )
    scheduler.start()
# a flask application
app = Flask(__name__)
# a flask route returning an item from the global deque
@app.route('/')
def display_job_result():
    current_time = datetime.datetime.now()
    job_finished_at = deque[0]
    return '''
        Current time is: {0} <br>
        Job finished at: {1}
        '''.format(current_time, job_finished_at)
# start the scheduler and flask server
if __name__ == '__main__':
    start_scheduler()
    app.run()

线程安全是不够的,如果您运行多个进程:

即使collections.deque是线程安全:

Deques支持线程安全,内存有效的附加和从Deque的任一侧附加和弹出,并且在任一方向上大致相同的O(1)性能。

来源:https://docs.python.org/3/library/collections.html#collections.deque

根据您的配置,您的Web服务器可能会在多个过程中运行多个工人,因此每个过程都有其对象的实例。


即使有一个工人,线程安全也可能还不够:

您可能已经选择了异步工人类型。异步工人不知道何时可以安全屈服,并且必须保护您的代码免受此类情况的保护:

  1. 请求的工人1读取值a并产生
  2. 请求2的工人还读取值a,写入a + 1并产生
  3. 请求1的工人写入值a + 1,即使应该是a + 1 + 1

可能的解决方案:

使用烧瓶应用程序以外的东西存储数据。这可以是数据库,在这种情况下,最好是Redis等内存数据库。或者,如果您的工作类型与multiprocessing模块兼容,则可以尝试使用multiprocessing.managers.BaseManager将您的Python对象提供给所有工作过程。

  • 全局变量线程是否安全?我如何共享请求之间的数据?
  • 我如何与多个工人一起向我的烧瓶应用程序提供共享状态,而不依赖其他软件?
  • 存储每个烧瓶会话的大数据或服务连接

最新更新