我查看了所有相关问题,但找不到我想要的内容。我有一个烧瓶应用程序app.py
其中还包括芹菜任务
from flask import Flask, request, current_app
from celery import Celery
from mongoalchemy.session import Session
from PIL import Image
from model import the_data
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
celery = Celery('app', broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def process_image():
with app.app_context():
session = Session.connect('mydb')
session.clear_collection(the_data)
image = Image.open(request.files['file'])
### do something ###
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
if 'file' in request.files:
process_image.delay()
return 'Processing image...'
if __name__ == '__main__':
app.run(debug=True, port=8008)
简而言之,app.py
将接收通过requests.post
从其他脚本发送的图像文件,并将其传递到队列以使用芹菜进行处理。
我使用app.app_context()
作为我发现的RuntimeError: Working outside of request context
的解决方案,但不幸的是,即使在我编辑了代码之后,错误仍然存在。
我应该怎么做才能解决这种类型的错误?
谢谢你的帮助。
看起来你可以通过将 *args 和 **kwargs 作为参数包含在你调用delay
来传递给你的任务。将文件传递给process_image
函数。这不起作用的原因是您在上下文之外访问请求。一旦return 'Processing image...'
执行,该上下文就会关闭。
编辑:基本上,从我链接到的文档来看,您可以向process_image
函数添加一个参数,并在delay
调用中将其传递给它。这是未经测试的,因此可能需要一些调整。
@celery.task
def process_image(file=None):
if file is None:
return False
image = Image.open(file)
### do something ###
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
if 'file' in request.files:
process_image.delay(request.files['file'])
return 'Processing image...'