我在Celery中使用Chord来进行回调,当一组并行任务完成执行时,会调用该回调。具体来说,我有一组函数来包装对外部API的调用。在Chord回调中处理结果和更新数据库之前,我想等待所有这些返回。我希望回调在所有API调用完成后执行,而不管它们的状态如何。
我的问题是,只有当组的任何子任务都没有引发异常时,才会调用回调函数。但是,如果一个子任务引发异常,则调用可选的错误处理程序on_error()
,并使用和弦的task_id
的字符串表示。组中的其余任务将继续执行,但从不调用回调。
我将用下面的例子来说明这一点:
@app.task
def maybe_succeed():
divisor = randint(0, 10)
return 1 / divisor
@app.task
def master_task():
g = group([maybe_succeed.s() for i in range(100)])
c = g | chord_callback.s()
return c.delay()
@app.task
def chord_callback(results):
print 'Made it here!'
在上面的例子中,调用master_task()
将运行组中的所有任务,但是,回调永远不会被调用,因为其中一个maybe_succeed()
将失败(除非你非常幸运!)。
现在,我正在处理这个问题,在我的等价maybe_succeed()
中捕获所有异常,这样和弦就永远不会失败。我想这是一个很好的解决方案,尽管它感觉不对劲。
所以,我的问题是:有没有一种方法可以让Celery Chord回调执行,而不管其组的子任务的返回状态如何?
您可以尝试在errback:中调用原始回调
@celery.task
def plus(x, y):
print(f'Running plus {x}, {y}')
return x + y
@celery.task
def failure():
print('Running failure')
raise ValueError('BAD')
@celery.task
def callme(stuff):
print('Callback')
print(f'Callback arg: {stuff}')
@celery.task
def on_chord_error(task_id, extra_info):
print('ON ERROR CALLBACK')
print(f'Task ID: {task_id}')
print(f'Extra info: {extra_info}')
callme.delay(extra_info)
@celery.task
def chord_test():
tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
callback = callme.s().on_error(on_chord_error.s('extra info'))
chord(tasks)(callback)
结果是:
Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
ret = j(timeout=3.0, propagate=True)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
interval=interval, no_ack=no_ack, on_interval=on_interval,
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
self.maybe_throw(callback=callback)
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
self.on_ready.throw(*args, **kwargs)
File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
reraise(type(exc), exc, tb)
File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
我最近偶然发现了这个问题,目前唯一的解决方案是monkey patch
结果后端(在我的案例中是Redis),以阻止异常被重新评估。
import celery
from celery import Celery, group, states
from celery.backends.redis import RedisBackend
def patch_celery():
"""Patch the redis backend."""
def _unpack_chord_result(
self, tup, decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)
if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
return '{}: {}'.format(retval.__class__.__name__, str(retval))
return retval
celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result
return celery
现在,您可以调用patch_celery().Celery
来访问修补版本。我写了一篇解释一切的文章。