即使使用Flask应用程序上下文,在运行芹菜任务时也会出现以下错误:
raised unexpected: RuntimeError('Working outside of application context.nnThis typically means that you attempted to use functionality that needednto interface with the current application object in some way. To solventhis, set up an application context with app.app_context(). See thendocumentation for more information.',)
Traceback (most recent call last):
File "/usr/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/lib/python3.6/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/app/example.py", line 172, in start_push_task
}, data=data)
File "/app/push.py", line 65, in push
if user and not g.get('in_celery_task') and 'got_user' not in g:
File "/usr/lib/python3.6/site-packages/werkzeug/local.py", line 347, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/lib/python3.6/site-packages/werkzeug/local.py", line 306, in _get_current_object
return self.__local()
File "/usr/lib/python3.6/site-packages/flask/globals.py", line 44, in _lookup_app_object
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed
to interface with the current application object in some way. To solve
this, set up an application context with app.app_context(). See the
documentation for more information.
有办法解决这个问题吗?
对我来说,问题是我有import celery
而不是from app import celery
。
以下是我为将来偶然发现这里的人提供的更多设置代码:
app.py
def make_celery(app):
app.config['broker_url'] = 'amqp://rabbitmq:rabbitmq@rabbit:5672/'
app.config['result_backend'] = 'rpc://rabbitmq:rabbitmq@rabbit:5672/'
celery = Celery(app.import_name, backend=app.config['result_backend'], broker=app.config['broker_url'])
celery.conf.update(app.config)
class ContextTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.test_request_context():
g.in_celery_task = True
res = self.run(*args, **kwargs)
return res
celery.Task = ContextTask
celery.config_from_object(__name__)
celery.conf.timezone = 'UTC'
return celery
celery = make_celery(app)
在另一个文件中:
from app import celery