芹菜使用默认代理而不是 reddis.烧瓶+芹菜+工厂图案



最接近的工作答案是:如何在 Celery 任务中使用 Flask-SQLAlchemy

我把这个问题针对的是实际使用蟒蛇、烧瓶、工厂图案和芹菜的人。Python 是 2.7,其他是今天的最新版本。

我试图避免循环依赖并以一种轻率的方式做,

我已经浏览了 10 页的谷歌和所有可能的解决方案,但我无法解决这个问题。

 ~/git/project celery -A app  worker --loglevel=info   

芹菜仍在连接到:

[2017-11-10 16:08:12,208: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.

在 32.00 秒后重试...

尽管尝试启动该应用程序

应用/扩展程序.py

from flask.ext.marshmallow import Marshmallow
from flask.ext.sqlalchemy import SQLAlchemy
from flask_mail import Mail
import flask
from celery import Celery

class FlaskCelery(Celery):
    def __init__(self, *args, **kwargs):
        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()
        if 'app' in kwargs:
            self.init_app(kwargs['app'])
    def patch_task(self):
        TaskBase = self.Task
        _celery = self
        class ContextTask(TaskBase):
            abstract = True
            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)
        self.Task = ContextTask
    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)
        print self._conf['broker_url']

celery = FlaskCelery()
db = SQLAlchemy()
ma = Marshmallow()
mail = Mail()

!!!!打印self._conf['broker_url']:redis://localhost:6379/0

应用/初始化.py

from flask import Flask, render_template
from app.extensions import db, ma, mail, celery
from celerytasks import save_mailbox_items, sumf
from config import config
from utils import encoding_utils

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    # SQLAlchemy configuration
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://...'
    # Celery configuration
    app.config['BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['broker_url'] = 'redis://localhost:6379/0'
    app.config['celery_broker_url'] = 'redis://localhost:6379/0'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
    register_extensions(app)
    return app

def register_extensions(app):
    db.init_app(app)
    with app.app_context():
        db.create_all()
    ma.init_app(app)
    mail.init_app(app)
    celery.init_app(app)
    from .api_v1 import api as api_v1_blueprint
    app.register_blueprint(api_v1_blueprint, url_prefix='/api/v1')
    @app.route('/', methods=['GET'])
    def index():
        return render_template('index.html')

./manager.py

import os
from flask.ext.script import Manager
from app import create_app
app = create_app(os.getenv('APP_CONFIG', 'default'))
manager = Manager(app)

@manager.shell
def make_shell_context():
    return dict(app=app)

if __name__ == '__main__':
    manager.run()

当您运行芹菜工作器时,它将使用创建

celery = FlaskCelery()

但是因为它没有接收 Flask 应用程序作为参数,所以您永远不会通过 self.init_app(kwargs['app'](,因此它将使用默认配置。

有几个选项可以在这里解决此问题:

  • 实例化 FlaskCelery 对象并在执行此操作时传递 Flask 实例

  • 在 FlaskCelery 类中,如果未在构造函数中传递任何参数,则在 init 函数中实例化 flask 应用。

对于最新一点,这将给出类似的东西

class FlaskCelery(Celery): 
    def __init__(self, *args, **kwargs):
        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()
        if 'app' in kwargs:
            self.init_app(kwargs['app'])
         else:
            self.init_app(create_app(os.getenv('APP_CONFIG', 'default')))
     def patch_task(self):
        TaskBase = self.Task
        _celery = self
        class ContextTask(TaskBase):
            abstract = True
            def __call__(self, *args, **kwargs):
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)
        self.Task = ContextTask
    def init_app(self, app):
        self.app = app
        self.config_from_object(app.config)
        print self._conf['broker_url']

最新更新