我试图将芹菜添加到我现有的烧瓶项目中。添加后,我在运行时出现了"在应用程序上下文之外的工作"错误。看来芹菜工人缺乏我的应用程序上下文。但是我不确定在这种情况下将申请的上下文传递给芹菜工人的何处。
这是我当前的结构(我尝试遵循带有蓝图和API文档的工厂模式(:
-run.py
-app
-module1
-controller.py
-model.py
-service.py
-__init__.py
-config.py
init.py
# __init__.py
import os
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
from app.config import Config
from flask_restplus import Api
from celery import Celery
cors = CORS()
db = SQLAlchemy()
api = Api()
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, include=["app.module1.service"])
def create_app(config_class = Config):
app = Flask(__name__, static_url_path='')
app.config.from_object(Config)
cors.init_app(app)
db.init_app(app)
api.init_app(app=app)
celery.conf.update(app.config)
from app.module1.controller import blueprint
from app.module1.controller import ns
app.register_blueprint(blueprint)
api.add_namespace(ns)
return app
run.py
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(threaded=True, debug=True)
service.py
from app import db, celery
@celery.task(bind=True)
def service1(self):
# do somethigng & return
for Controller.py
from flask import Blueprint
from flask_restplus import Api, Resouce
blueprint = Blueprint('service', __name__)
apis = Api(app = blueprint)
ns = apis.namespace('service', 'service description')
@ns.route("/")
class SomeList(Resource):
def get(self):
service1.apply_async()
# return
我认为混乱是基于您试图将应用程序上下文"传递"给芹菜工人的事实。实际上,烧瓶过程无法将上下文传递给工人,因为它们是不同的过程。芹菜工作过程需要通过调用create_app()
来创建自己的烧瓶应用程序实例,以便在需要时推动自己的应用程序上下文。
因此,在您的service1
任务中:
from app import db, celery, create_app
@celery.task(bind=True)
def service1(self):
app = create_app()
with app.app_context():
# do somethigng & return
为了使其更有效,您可以创建一个由所有任务共享的单个全局app
:
from app import db, celery, create_app
app = create_app()
@celery.task(bind=True)
def service1(self):
with app.app_context():
# do somethigng & return