我最近切换到Celery 3.0。在那之前,我一直在使用烧瓶芹菜,以便将芹菜与烧瓶相结合。尽管它有很多问题,比如隐藏一些强大的Celery功能,但它允许我使用Flask应用程序的完整上下文,尤其是Flask SQLAlchemy。
在我的后台任务中,我处理数据和SQLAlchemyORM来存储数据。Flask Celery的维护者已经放弃了对该插件的支持。插件在任务中酸洗了Flask实例,这样我就可以完全访问SQLAlchemy。
我试图在tasks.py文件中复制这种行为,但没有成功。你对如何做到这一点有什么提示吗?
更新:从那以后,我们开始使用一种更好的方法来处理应用程序的拆卸,并根据最近的flask文档中描述的模式在每个任务的基础上进行设置
分机.py
import flask
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
celery = FlaskCelery()
db = SQLAlchemy()
应用程序
from flask import Flask
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.init_app(app)
return app
一旦你以这种方式设置了你的应用程序,你就可以运行和使用芹菜,而不必在应用程序上下文中显式地运行它,因为如果必要,你的所有任务都会自动在应用程序环境中运行,而且你不必显式地担心任务后的拆卸,这是一个需要管理的重要问题(请参阅下面的其他响应)。
故障排除
那些不断获得with _celery.app.app_context(): AttributeError: 'FlaskCelery' object has no attribute 'app'
的人一定要:
- 将
celery
导入保持在app.py
文件级别。避免:
应用程序
from flask import Flask
def create_app():
app = Flask()
initiliaze_extensions(app)
return app
def initiliaze_extensions(app):
from extensions import celery, db # DOOMED! Keep celery import at the FILE level
db.init_app(app)
celery.init_app(app)
- 在
flask run
和使用之前让你的芹菜工人开始
celery worker -A app:celery -l info -f celery.log
注意app:celery
,即从app.py
加载。
您仍然可以从扩展导入以装饰任务,即from extensions import celery
。
下面的老答案,仍然有效,但不是一个干净的解决方案
我更喜欢在应用程序上下文中运行所有的芹菜,方法是创建一个单独的文件,用应用程序上下文调用celener.start()。这意味着您的任务文件不必到处都是上下文设置和拆卸。它也很适合烧瓶的"应用工厂"模式。
分机.py
from from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
tasks.py
from extensions import celery, db
from flask.globals import current_app
from celery.signals import task_postrun
@celery.task
def do_some_stuff():
current_app.logger.info("I have the application context")
#you can now use the db object from extensions
@task_postrun.connect
def close_session(*args, **kwargs):
# Flask SQLAlchemy will automatically create new sessions for you from
# a scoped session factory, given that we are maintaining the same app
# context, this ensures tasks have a fresh session (e.g. session errors
# won't propagate across tasks)
db.session.remove()
应用程序
from extensions import celery, db
def create_app():
app = Flask()
#configure/initialize all your extensions
db.init_app(app)
celery.config_from_object(app.config)
return app
RunCelery.py
from app import create_app
from extensions import celery
app = create_app()
if __name__ == '__main__':
with app.app_context():
celery.start()
在tasks.py文件中执行以下操作:
from main import create_app
app = create_app()
celery = Celery(__name__)
celery.add_defaults(lambda: app.config)
@celery.task
def create_facet(project_id, **kwargs):
with app.test_request_context():
# your code
当然那没用。我最终在我的Flask应用程序工厂里发生了一场争论,如果Celery调用db.init_app(应用程序),就不要运行它。相反,工人会在Celery分叉后调用它。我现在在MySQL进程列表中看到了几个连接。
from extensions import db
from celery.signals import worker_process_init
from flask import current_app
@worker_process_init.connect
def celery_worker_init_db(**_):
db.init_app(current_app)
from flask import Flask
from werkzeug.utils import import_string
from celery.signals import worker_process_init, celeryd_init
from flask_celery import Celery
from src.app import config_from_env, create_app
celery = Celery()
def get_celery_conf():
config = import_string('src.settings')
config = {k: getattr(config, k) for k in dir(config) if k.isupper()}
config['BROKER_URL'] = config['CELERY_BROKER_URL']
return config
@celeryd_init.connect
def init_celeryd(conf=None, **kwargs):
conf.update(get_celery_conf())
@worker_process_init.connect
def init_celery_flask_app(**kwargs):
app = create_app()
app.app_context().push()
- 在celeryd-init处更新芹菜配置
- 使用您的烧瓶应用程序工厂初始化所有烧瓶扩展,包括SQLAlchemy扩展
通过这样做,我们能够维护每个工作者的数据库连接。
如果您想在flask上下文下运行任务,可以将Task.__call__
:子类化
class SmartTask(Task):
abstract = True
def __call__(self, *_args, **_kwargs):
with self.app.flask_app.app_context():
with self.app.flask_app.test_request_context():
result = super(SmartTask, self).__call__(*_args, **_kwargs)
return result
class SmartCelery(Celery):
def init_app(self, app):
super(SmartCelery, self).init_app(app)
self.Task = SmartTask