使用芹菜构建一堆顺序任务的最佳方法是什么?我的代码有一堆独立的任务(所以它们都可以是不可变的签名),但是如果其中一个任务抛出异常,我想停止这个序列。
我一直在寻找解决这个问题的方法,但是我卡住了。我们使用的是芹菜3.1.12 + RabbitMQ。
首先,我们使用一个和弦来声明头任务成功,以便发生回调。它工作得很好,只是我们需要在头文件中添加更多的任务。
所以我试着在和弦中做一个链。这也可以工作,但是和弦挂起PENDING,因为当子任务引发异常时链不会退出。
一个人为的例子:
@celery.task
def bite(food):
if food == 'salad':
raise TypeError('Throwing up. I hate {}'.format(food))
print "bite {}...".format(food)
return True
@celery.task
def chew(food):
print "chewing {}...".format(food)
return True
@celery.task
def swallow(food):
print "swallowing {}...".format(food)
return True
@celery.task
def chain_in_chord(food):
return chord(
chain(
bite.si(food), chew.si(food)
),
swallow.si(food)
).delay()
如果food=salad, bite子任务将抛出异常。链条的其余部分不发生,这是我想要的。但是整个和弦被卡在PENDING状态,因为链被卡在PENDING状态并且不会退出。
>>> res = foo.chain_in_chord('salad')
>>> res.status
'PENDING'
所以我需要:
- 找出一种方法来中止链并重新引发异常链条失效
- 或者,找到一种方法来指定多个子任务和弦的标题(我似乎做不到)。
在线搜索,chain的行为显然是预期的-所以你必须遍历asyncResult的每个父状态。我更喜欢一种机制,整个事情中止并重新引发异常/跟踪……像和弦一样,但可以选择添加多个子任务。
任何反馈将不胜感激。谢谢。
最近有人指出Celery-tasktree。试玩了一下,看起来很有希望。
上面的任务(咬,咀嚼,吞咽)可以按顺序实现,将一个任务连接到另一个任务。它也可以用香草芹菜来完成,但这里的语法似乎更干净。
def eat_tree(food):
tree = TaskTree()
bite_task = tree.add_task(bite, args=[food])
chew_task = bite_task.add_task(chew, args=[food])
swallow_task = chew_task.add_task(swallow, args=[food])
return tree.apply_async()
结果是:
>>> res = foo.eat_tree('steak')
>>> res
<TaskSetResult: 62c392ce-5ac2-4bd7-89f9-c1e004af1e56 [1b971232-2341-474b-897a-e5caab609eeb]>
>>> res.get()
[True]
>>> dir(res)
['__class__', '__delattr__', '__dict__', '__doc__', '__eq__', '__format__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__iter__', '__len__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_args__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_failed_join_report', 'add', 'app', 'as_tuple', 'backend', 'children', 'clear', 'completed_count', 'delete', 'discard', 'failed', 'forget', 'get', 'id', 'iter_native', 'iterate', 'itersubtasks', 'join', 'join_native', 'maybe_reraise', 'parent', 'ready', 'remove', 'restore', 'results', 'revoke', 'save', 'serializable', 'subtasks', 'successful', 'supports_native_join', 'taskset_id', 'total', 'update', 'waiting']
>>> res.results[0]
<AsyncResult: 1b971232-2341-474b-897a-e5caab609eeb>
>>> res.results[0].status
u'SUCCESS'
>>> res.successful()
True
>>> res.taskset_id
'62c392ce-5ac2-4bd7-89f9-c1e004af1e56'
>>>
如果其中一个子任务抛出异常,Tasktree抛出异常并将该树标记为FAILURE(这是我想要的),而不是PENDING(如常规的芹菜)。
>>> res = foo.eat_tree('salad')
>>> res.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 574, in get
interval=interval, callback=callback, no_ack=no_ack)
File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 687, in join_native
raise value
celery.backends.base.Exception: Throwing up. I hate salad
>>> res.successful()
False
>>> res.results[0].status
u'FAILURE'
>>>