芹菜:中止或撤销和弦中的所有任务



我在Redis代理和后端使用以下设置:

chord([A, A, A, ...])(B)

  • 任务A做一些检查。它使用AbortableTask作为基础,并定期检查task.is_aborted()标志
  • 任务B通知用户计算结果

用户有可能中止A任务。不幸的是,在所有任务A实例上调用AbortableAsyncResult(task_a_id).abort()时,只有活动实例被中止。工作人员尚未接收到的任务的状态更改为ABORTED,但它们仍在处理中,并且is_aborted()标志返回False。

当然,我可以revoke()挂起的任务,而不是abort(),但问题是在这种情况下,和弦主体(任务B)不再执行。

如何在确保任务B运行的同时停止所有挂起和正在运行的任务A实例?

只需获取A所有实例的id列表并停止它们。

考虑一下这个简单的和弦

from celery import chord 
my_chord = chord(a.si() for i in range(300))(b.si())

现在,您可以使用从my_chord实例中获得子任务列表(a任务的所有实例)

for taks in my_chord.parent.subtasks:
    print(task.id)

现在,您可以对这些任务实例执行任何您想执行的操作。例如,您可以撤销所有的,而不管它们的当前状态如何。

from celery.task.control import revoke
for task in my_chord.parent.subtasks:
    revoke(task.id, terminate=True)

默认情况下,revoke只杀死挂起的任务。但是,如果将terminate=True传递给它,它也会杀死正在执行的任务。

此外,chord的回调函数将在其所有子任务成功执行后调用。由于您正在取消chord的子任务,因此不会调用回调函数,并且chord任务将导致失败。因此,您必须重试回调任务。

与其将任务本身编为和弦,不如考虑让和弦任务监视A任务。我的意思是,和弦将包含任务,这些任务每隔一段时间就会检查正在运行的任务(A),看看它们是否已完成或已撤销。当所有这些都成功返回时,和弦然后链接到任务B

相关内容

  • 没有找到相关文章

最新更新