最接近的工作答案是:如何在 Celery 任务中使用 Flask-SQLAlchemy
我把这个问题针对的是实际使用蟒蛇、烧瓶、工厂图案和芹菜的人。Python 是 2.7,其他是今天的最新版本。
我试图避免循环依赖并以一种轻率的方式做,
我已经浏览了 10 页的谷歌和所有可能的解决方案,但我无法解决这个问题。
~/git/project celery -A app worker --loglevel=info
芹菜仍在连接到:
[2017-11-10 16:08:12,208: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
在 32.00 秒后重试...
尽管尝试启动该应用程序
应用/扩展程序.py
from flask.ext.marshmallow import Marshmallow
from flask.ext.sqlalchemy import SQLAlchemy
from flask_mail import Mail
import flask
from celery import Celery
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
print self._conf['broker_url']
celery = FlaskCelery()
db = SQLAlchemy()
ma = Marshmallow()
mail = Mail()
!!!!打印self._conf['broker_url']:redis://localhost:6379/0
应用/初始化.py
from flask import Flask, render_template
from app.extensions import db, ma, mail, celery
from celerytasks import save_mailbox_items, sumf
from config import config
from utils import encoding_utils
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
# SQLAlchemy configuration
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://...'
# Celery configuration
app.config['BROKER_URL'] = 'redis://localhost:6379/0'
app.config['broker_url'] = 'redis://localhost:6379/0'
app.config['celery_broker_url'] = 'redis://localhost:6379/0'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
register_extensions(app)
return app
def register_extensions(app):
db.init_app(app)
with app.app_context():
db.create_all()
ma.init_app(app)
mail.init_app(app)
celery.init_app(app)
from .api_v1 import api as api_v1_blueprint
app.register_blueprint(api_v1_blueprint, url_prefix='/api/v1')
@app.route('/', methods=['GET'])
def index():
return render_template('index.html')
./manager.py
import os
from flask.ext.script import Manager
from app import create_app
app = create_app(os.getenv('APP_CONFIG', 'default'))
manager = Manager(app)
@manager.shell
def make_shell_context():
return dict(app=app)
if __name__ == '__main__':
manager.run()
当您运行芹菜工作器时,它将使用创建
celery = FlaskCelery()
但是因为它没有接收 Flask 应用程序作为参数,所以您永远不会通过 self.init_app(kwargs['app'](,因此它将使用默认配置。
有几个选项可以在这里解决此问题:
实例化 FlaskCelery 对象并在执行此操作时传递 Flask 实例
在 FlaskCelery 类中,如果未在构造函数中传递任何参数,则在 init 函数中实例化 flask 应用。
对于最新一点,这将给出类似的东西
class FlaskCelery(Celery):
def __init__(self, *args, **kwargs):
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
else:
self.init_app(create_app(os.getenv('APP_CONFIG', 'default')))
def patch_task(self):
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
self.app = app
self.config_from_object(app.config)
print self._conf['broker_url']