我有两个文件:
app.py
from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
app.secret_key = ""
api = Api(app)
@app.before_first_request
def create_tables():
db.create_all()
api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")
@celery.task
def update_row(data):
pass
if __name__ == "__main__":
db.init_app(app)
app.run(debug=True,port=5000)
item.py
from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel
class ItemInsert(Resource):
def post(self):
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
正如你在app.py
中看到的,我从item.py
导入了类,但是现在我的芹菜(任务)函数调用即update_row
从item.py
被挂起,因为我不能从app.py导入,因为它会导致循环导入。有解决办法吗?
使用简单的项目,您可以在app.py
内实现您现在正在做的任务。但是对于更复杂的项目,最好将任务定义移动到一个单独的包中,这样可以减少循环导入。
一样:
App和芹菜配置
**# app.py**
# App & celery
# ...
任务定义
**# tasks.py**
from project_name.app import celery
@celery.task
def update_row(data):
pass
/p> 将任务分离到另一个包中( 但是如果你仍然想使用当前的方法,为了防止循环导入,你可以在调用API时动态导入它:**# resources/item.py**
from project_name.tasks import update_row
# ...
tasks
包,由芹菜自动发现)可以帮助您防止循环导入,也有利于维护代码。**# resources/item.py**
# ...
class ItemInsert(Resource):
def post(self):
from project_name.app import update_row
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
celery_app.task
是一个装饰器,所以只是一个普通的Python函数。它的工作方式如下:它接受你的函数,在芹菜应用程序中注册它,并返回一个包装器对象,方法delay
, apply_async
等。并且您总是可以通过名称从celery_app.tasks
字典中获得已注册的任务。避免循环导入的另一个技巧是将celery_app
引用存储为flask_app
的属性,并且在请求上下文中,您总是可以从flask.current_app
获取当前的flask应用程序。 app.py
from tasks import register_tasks
...
app = Flask(__name__)
...
app.celery = Celery(app.name, ...)
register_tasks(app.celery)
tasks.py
def update_row(data):
pass
def register_tasks(celery_app):
celery_app.task(update_row, name="update_row")
views.py
from flask import current_app
class ItemInsert(Resource):
def post(self):
update_row = current_app.celery.tasks["update_row"]
file_task = update_row.apply_async((data,), countdown=3)
return item.json(), 201
UPD:实际上最规范的方法是使用任务自动发现:
myapp/tasks.py
from celery import shared_task
@shared_task
def update_row(data):
pass
myapp/app.py
celery_app = Celery(...)
celery_app.set_default()
celery_app.autodiscover_tasks(["myapp"], force=True)
myapp/views.py
from .tasks import update_row
def index_view():
update_row.delay(...)