SqlAlchemy和芹菜结构在python中的问题



我有两个文件:

app.py

from flask import Flask
from flask_restful import Api
from celery import Celery
from resources.item import Item, ItemList, ItemInsert
from db import db
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = ""
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config['CELERY_BROKER_URL'] = ''
app.config['CELERY_RESULT_BACKEND'] = ''
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
app.secret_key = ""
api = Api(app)
@app.before_first_request
def create_tables():
    db.create_all()
api.add_resource(ItemList,"/items")
api.add_resource(Item,"/item/<int:id>")
api.add_resource(ItemInsert,"/items")
@celery.task
def update_row(data):
    pass
if __name__ == "__main__":
    db.init_app(app)
    app.run(debug=True,port=5000)

item.py

from flask_restful import Resource, reqparse
from flask import request
from models.item import ItemModel
class ItemInsert(Resource):
    def post(self):
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

正如你在app.py中看到的,我从item.py导入了类,但是现在我的芹菜(任务)函数调用即update_rowitem.py被挂起,因为我不能从app.py导入,因为它会导致循环导入。有解决办法吗?

使用简单的项目,您可以在app.py内实现您现在正在做的任务。但是对于更复杂的项目,最好将任务定义移动到一个单独的包中,这样可以减少循环导入。

一样:

App和芹菜配置

**# app.py**
# App & celery
# ...

任务定义

**# tasks.py**
from project_name.app import celery
@celery.task
def update_row(data):
    pass

/p>

**# resources/item.py**
from project_name.tasks import update_row
# ...

将任务分离到另一个包中(tasks包,由芹菜自动发现)可以帮助您防止循环导入,也有利于维护代码。


但是如果你仍然想使用当前的方法,为了防止循环导入,你可以在调用API时动态导入它:

**# resources/item.py**
# ...
class ItemInsert(Resource):
    def post(self):
        from project_name.app import update_row
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

celery_app.task是一个装饰器,所以只是一个普通的Python函数。它的工作方式如下:它接受你的函数,在芹菜应用程序中注册它,并返回一个包装器对象,方法delay, apply_async等。并且您总是可以通过名称从celery_app.tasks字典中获得已注册的任务。避免循环导入的另一个技巧是将celery_app引用存储为flask_app的属性,并且在请求上下文中,您总是可以从flask.current_app

获取当前的flask应用程序。 app.py

from tasks import register_tasks
...
app = Flask(__name__)
...
app.celery = Celery(app.name, ...)
register_tasks(app.celery)

tasks.py

def update_row(data):
    pass
def register_tasks(celery_app):
    celery_app.task(update_row, name="update_row")

views.py

from flask import current_app
class ItemInsert(Resource):
    def post(self):
        update_row = current_app.celery.tasks["update_row"]
        file_task = update_row.apply_async((data,), countdown=3)
        return item.json(), 201

UPD:实际上最规范的方法是使用任务自动发现:

myapp/tasks.py

from celery import shared_task
@shared_task
def update_row(data):
    pass

myapp/app.py

celery_app = Celery(...)
celery_app.set_default()
celery_app.autodiscover_tasks(["myapp"], force=True)

myapp/views.py

from .tasks import update_row
def index_view():
    update_row.delay(...)

相关内容

  • 没有找到相关文章

最新更新