我正在尝试使用Gunicorn构建一个Flask应用程序来服务并发请求。就其价值而言,上下文是一个自带容器 Sagemaker 应用程序。
问题是我需要应用程序定期检查更新。所以我想为此实现一个线程。下面是一些带有更新线程的 Flask 代码的最小示例。
server.py
from flask import Flask
import time, threading
app = Flask(__name__)
message = True
def update():
while True:
message = not message
time.sleep(10)
@app.route("/")
def hello():
global message
return message
update_thread = threading.Thread(target=update)
if __name__ == "__main__":
update_thread.start()
app.run()
update_thread.join()
然后我用枪角兽启动:
gunicorn -k gevent -b unix:/tmp/gunicorn.sock -w 4 server:app
也许不出所料,更新线程不会启动,因为__main__
部分永远不会执行。
问题:如何在带有 Gunicorn 的 Flask 应用程序中使用更新线程(或类似结构(?
看起来可以使用Flask-APScheduler
完成此操作,如下所示:
pip install flask_apscheduler
server.py
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
app = Flask(__name__)
message = True
def update():
global message
message = not message
scheduler = BackgroundScheduler()
scheduler.add_job(func=update,trigger="interval",seconds=10)
scheduler.start()
# shut down the scheduler when exiting the app
atexit.register(scheduler.shutdown)
@app.route("/")
def hello():
global message
return message
if __name__ == "__main__":
app.run()
然后像往常一样启动gunicorn -k gevent -b unix:/tmp/gunicorn.sock -w 4 server:app