我试图通过查询每个任务状态来获得任务链的进度。但是当通过id检索链时,我得到一些行为不同的对象。
在tasks.pyfrom celery import Celery
celery = Celery('tasks')
celery.config_from_object('celeryconfig')
def unpack_chain(nodes):
while nodes.parent:
yield nodes.parent
nodes = nodes.parent
yield nodes
@celery.task
def add(num, num2):
return num + num2
In [43]: from celery import chain
In [44]: from tasks import celery, add, unpack_chain
In [45]: c = chain(add.s(3,3), add.s(10).set(countdown=100))
In [46]: m = c.apply_async()
In [47]: a = celery.AsyncResult(m.id)
In [48]: a == m
Out[48]: True
In [49]: a.id == m.id
Out[49]: True
In [50]: [t.status for t in list(unpack_chain(a))]
Out[50]: ['PENDING']
In [51]: [t.status for t in list(unpack_chain(m))]
Out[51]: ['PENDING', 'SUCCESS']
在Redis下使用Python 2.7.3和芹菜3.0.19。
可以看到50 &51, celery.AsyncResult
返回的值与原链不同。
如何通过链id获取原始链任务列表?
就像@Hernantz说的,你不能从任务ID中恢复父链,你必须遍历你的队列,这可能是可能的,也可能是不可能的,这取决于你使用什么作为代理。
但是如果您有最后一个任务id来进行查找,那么您就有了链,您只需要存储所有任务id并在需要检查其状态时重建链。您可以使用以下函数:
def store(node):
id_chain = []
while node.parent:
id_chain.append(node.id)
node = node.parent
id_chain.append(node.id)
return id_chain
def restore(id_chain):
id_chain.reverse()
last_result = None
for tid in id_chain:
result = celery.AsyncResult(tid)
result.parent = last_result
last_result = result
return last_result
当你第一次从chain获得AsyncResult时调用store。在上面调用restore会得到一个AsyncResult
的链表,就像chain给你的一样。