我正在使用Flask-SQLAlchemy,Celery和uWSGI。
我知道 Flask-SQLAlchemy 会自动为您管理会话。我不确定这如何与 Celery 工作人员一起工作,但似乎当我第二次运行任务时,我收到以下错误:数据库错误:(psycopg2。数据库错误)服务器意外关闭了连接。
下面介绍了如何创建应用上下文和芹菜任务:
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_BACKEND'],
broker=app.config['CELERY_BROKER_URL'],
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
似乎工作人员使用相同的数据库连接,并且在任务完成后该连接没有补充?
可能与以下问题有关?
我不确定如何正确设置工人或芹菜,以便他们使用与数据库的新连接。
好的。我想通了,对于使用应用程序上下文的每个进程,都必须使用新的应用程序上下文。以前,在我的app/__init__.py
中,我只是像这样全局创建应用程序:
from flask import Flask
app = Flask(__name__)
然后,我将我的应用程序更改为使用此模式中的create_app
现在,我的 tasks.py 如下所示:
from myapp import create_app
from celery import Celery
def make_celery(app=None):
app = app or create_app()
celery = Celery(
app.import_name,
backend=app.config['CELERY_BACKEND'],
broker=app.config['CELERY_BROKER_URL'],
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery()
确保在create_app中调用db.init_app(应用程序)。