我在我的数据库中保留了对链的引用。
from tasks import t1, t2, t3
from celery import chain
res = chain(t1.s(123)|t2.s()|t3.s())()
res.get()
如何将其他任务附加到此特定链?
res.append(t2.s())
我的目标是确保链的执行顺序与我在代码中指定的顺序相同。如果任务在我的链中失败,则不会执行以下任务。
要知道我在指定队列中使用超大任务。
所有信息都包含在消息中。
消息可以在传输中,也可能在世界的另一端,也可能由中间处理器使用。因此,无法在发送消息后对其进行修改。
见 http://docs.celeryproject.org/en/latest/userguide/tasks.html#state
我的目标是确保链的执行顺序与我在代码中指定的顺序相同。和 如果任务在我的链中失败,则不会执行以下任务。
您可以确定,订单是作为消息的一部分发送的,并且不会继续如果任何任务失败。
现在,如果您真的希望能够在运行时添加任务,那么您可以存储数据库中的信息,并让任务本身检查并调用新任务。但是,这样做时存在一些挑战:
1)链中的第一个任务如果成功,将调用下一个任务, 然后下一个任务将调用之后的下一个任务,依此类推。
2) 如果将任务添加到此流程中,如果第一个任务已经执行,会发生什么情况? 还是第二个,还是第三个?
因此,正如您可能猜到的那样,这将需要一些繁重的同步才能工作。
我想一个更简单的解决方案是创建一个等待一个任务完成的任务然后应用回调:
from celery import subtask
from celery.result import from_serializable
@app.task(bind=True)
def after_task(self, result, callback, errback=None):
result = from_serializable(result)
if not result.ready():
raise self.retry(countdown=1)
if task.successful():
subtask(callback).delay(result.get())
else:
if errback:
subtask(errback)()
def add_to_chain(result, callback, errback=None):
callback = callback.clone() # do not modify caller
new_result = callback.freeze() # sets id for callback, returns AsyncResult
new_result.parent = result
after_task.delay(result.serializable(), callback, errback)
return new_result
然后你可以像这样使用它:
from tasks import t1, t2, t3
res = (t1.s(123) | t2.s() | t3.s())()
res = add_to_chain(t2.s())
笔记:
bind=True
是即将推出的 3.1 版本中的新功能,适用于旧版本您必须删除 self 参数并使用 current_task.retry
(获取此from celery import current_task
)。
Signature.freeze
3.1 中也是新功能,可以执行在旧版本中,您可以使用:
from celery import uuid
def freeze(sig, _id=None):
opts = sig.options
try:
tid = opts['task_id']
except KeyError:
tid = opts['task_id'] = _id or uuid()
return sig.AsyncResult(tid)