使用芹菜工人与SQLalchemy DB进行交互,包括从请求中了解用户



我对此进行了大量研究,包括尝试这样的答案。看来芹菜无法访问我的烧瓶应用程序的上下文。

我非常了解我的芹菜对象,什么将装饰我的任务,必须可以访问烧瓶应用程序的上下文。我确实相信应该这样做,因为我遵循本指南来创建芹菜对象。我不确定混乱是否位于我正在使用烧瓶httpauth的事实中。

这是我所拥有的。

def make_celery(app):
    celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
    items = g.user.items
    for item in items:
        print(item)

使用烧瓶运行此任务是不做的。我尝试通过击中服务器来启动此功能(授权!)。

@app.route("/item_loop")
@auth.login_required
def item_loop():
    result = loop.delay()
    return "It's running."

但是芹菜工人告诉我任务raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",),我相信,如前所述,即使我使用了推荐的工厂模式,我的芹菜对象也没有应用程序上下文。

虽然戴夫(Dave)和格雷格(Greg)的答案中的重新计算是有效的,但他们错过的突出显示是您对芹菜任务中应用程序上下文使用的误解。

您有一个烧瓶应用程序,其中您正在使用烧瓶-Httpauth。您可能有一个verify_password处理程序,该处理程序将g.user设置为身份验证的用户。这意味着,当您处理请求时,您可以作为g.user访问用户。这一切都很好。

您还有一个或多个芹菜工人,它们是与烧瓶服务器无直接连接的单独过程。烧瓶服务器与芹菜工作过程之间的唯一通信发生在您使用的消息经纪(通常是Redis或RabbitMQ)上。

根据您的需求,芹菜工人可能需要访问烧瓶应用程序。当使用将其配置存储在app.config字典中的烧瓶扩展时,这是非常普遍的。需要这两个常见的扩展名是烧瓶 - 塞拉尔彻和瓶邮件。如果不访问app.config,芹菜任务将无法打开与数据库的连接或发送电子邮件,因为它不知道数据库和/或电子邮件服务器的详细信息。

为了让芹菜工人访问配置,公认的做法是在每个工人中创建重复的烧瓶应用程序。这些是次要应用程序,绝不连接到主瓶服务器使用的实际应用程序对象。他们的唯一目的是保留您的任务或任务使用的任何烧瓶扩展名可以访问的原始app.config字典的副本。

因此,期望在瓶装服务器中将g.user设置在芹菜任务中也可以作为g.user访问,这是无效的,仅仅是因为它们是不同的g对象,来自不同的应用程序实例。

如果您需要在芹菜任务中使用身份验证的用户,则应该做的是将user_id(通常是g.user.id)作为参数传递给您的任务。然后,在您的任务中,您可以使用此id从数据库中加载用户。希望这会有所帮助!

要从任务执行中检索用户,您可以尝试传递用户对象(如果芹菜可以腌制),或传递足够的信息以使任务可以检索用户对象(例如,用户的ID)。在后一种情况下,您的任务看起来像

@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
    user = User.query.get(user_id)
    items = user.items
    for item in items:
        print(item)

,您将通过

开始(假设您使用blask_login)
result = loop.delay(current_user.id)

正如@dave W. Smith指出的那样,而不是依靠g来检索用户,而是将用户信息作为参数传递给芹菜任务可能是更好的方法。根据应用程序上下文上的烧瓶文档,g的寿命是一个请求。由于芹菜任务是异步执行的,因此它将在您定义用户的请求中的应用程序上下文中执行。

最新更新