我在Redis代理和后端使用以下设置:
chord([A, A, A, ...])(B)
- 任务A做一些检查。它使用
AbortableTask
作为基础,并定期检查task.is_aborted()
标志 - 任务B通知用户计算结果
用户有可能中止A任务。不幸的是,在所有任务A实例上调用AbortableAsyncResult(task_a_id).abort()
时,只有活动实例被中止。工作人员尚未接收到的任务的状态更改为ABORTED
,但它们仍在处理中,并且is_aborted()
标志返回False。
当然,我可以revoke()
挂起的任务,而不是abort()
,但问题是在这种情况下,和弦主体(任务B)不再执行。
如何在确保任务B运行的同时停止所有挂起和正在运行的任务A实例?
只需获取A
所有实例的id列表并停止它们。
考虑一下这个简单的和弦
from celery import chord
my_chord = chord(a.si() for i in range(300))(b.si())
现在,您可以使用从my_chord
实例中获得子任务列表(a
任务的所有实例)
for taks in my_chord.parent.subtasks:
print(task.id)
现在,您可以对这些任务实例执行任何您想执行的操作。例如,您可以撤销所有的,而不管它们的当前状态如何。
from celery.task.control import revoke
for task in my_chord.parent.subtasks:
revoke(task.id, terminate=True)
默认情况下,revoke
只杀死挂起的任务。但是,如果将terminate=True
传递给它,它也会杀死正在执行的任务。
此外,chord的回调函数将在其所有子任务成功执行后调用。由于您正在取消chord的子任务,因此不会调用回调函数,并且chord任务将导致失败。因此,您必须重试回调任务。
与其将任务本身编为和弦,不如考虑让和弦任务监视A任务。我的意思是,和弦将包含任务,这些任务每隔一段时间就会检查正在运行的任务(A),看看它们是否已完成或已撤销。当所有这些都成功返回时,和弦然后链接到任务B