运行时间相对较长的任务委派给在另一台服务器上单独运行的芹菜工人。
但是,结果被添加回关系数据库(表根据task_descr.id
作为键更新,见下文),worker 使用 ignore_result
。
从烧瓶应用程序请求的任务:
task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])
问题是在 Flask 端的事务尚未关闭时请求任务。这会导致争用条件,因为有时芹菜工人会在 Flask 应用中的事务结束之前完成任务。
仅在成功交易后发送任务的正确方法是什么?
还是工作人员应该在尝试条件UPDATE
之前检查task_descr.id
可用性并重试任务(这感觉太复杂了)?
提交某种类型的模型后 Answer to Run 函数讨论了类似的情况,但这里的任务发送是显式的,因此无需侦听某些模型中的更新/插入。
一种方法是每个请求后请求回调,感谢Armin Ronacher:
from flask import g
def after_this_request(func):
if not hasattr(g, 'call_after_request'):
g.call_after_request = []
g.call_after_request.append(func)
return func
@app.after_request
def per_request_callbacks(response):
for func in getattr(g, 'call_after_request', ()):
response = func(response)
return response
在我的情况下,用法是嵌套函数的形式:
task_desc = ...
attachments = ...
#...
@after_this_request
def send_mytask(response):
if response.status_code in {200, 302}:
task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])
return response
不理想,但有效。我的任务仅用于成功服务的请求,所以我不在乎 500 或其他错误条件。