Python /芹菜:在杀死父任务时,如何杀死子任务



上下文

我已经创建了一个django应用程序,该应用程序调用了芹菜任务,该任务依次产生其他任务并等待它们完成。

这是工作流程:

1)主python/django代码在后台启动芹菜任务

2)芹菜任务过程一些代码,然后启动芹菜组的一组不同的任务,然后等待它们准备就绪

3)小组的每个任务然后以相同的方式产生另一组子任务,然后等待它们完成

它运行良好(尽管我是乞eg,并且可能实现了很差),但是现在我希望能够终止每个孩子的过程,如果我杀死了开始的主要芹菜任务。

我到目前为止拥有的

我已经使用一个简单的父任务来重新创建了这种情况,这些任务已产生多个子任务,并且我修改了芹菜任务类的" on_failure"方法,以杀死它的孩子。

tasks.py

from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')

class MyTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
        print('{0!r} failed: {1!r}'.format(task_id, exc))
@application.task(base=MyTask)
def childTask():
    while True:
        time.sleep(10)
        print("Message de la tache enfant")
        continue
@application.task(base=MyTask)
def parentTask(pra_id = None):
    child_tasks = []
    print("Lancement tache mère")
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    tasks = group(child_tasks)
    tasks.apply_async()
    time.sleep(15)
    raise KeyError

main.py

from tasks import parentTask
parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)

当代码引发错误时,父任务也会成功杀死,其子任务也是我想要的。

我需要

我需要能够从我的Django应用程序中手动杀死我的父任务。

这是通过检查芹菜工人并通过搜索其论点来找到我的任务来完成的,但是,当我找到芹菜任务时,这是成功完成的,它不会终止由此产生的孩子任务任务,这就是我需要的。

我到目前为止尝试了

我尝试创建一个由"撤消"信号触发的函数

(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)

将在撤销任务时执行。

捕获工作信号(在撤销任务时我能够执行一些代码),但是我无法使用与上述" on_failure"方法相同的代码来杀死Childs任务。

问题

发送到该函数的请求对象确实包含我的父任务,但是当该类包含包含子任务的groupresult对象时,类的"子"属性为空。

不确定这是否对您有帮助,但是我发现的工作有点好,可以将每个子任务存储在Redis或某些数据库中,并将其与某些数据库相关联pipeline_id。然后,如果我需要杀死父任务,我也可以杀死所有存储在列表中的子任务。

result.revoke(terminate=True)
subtask_results = get_subtask_status(pipeline_id) #Custom Function
for subtask_result in subtask_results:
    subtask_result.revoke(terminate=True)

默认情况下,芹菜任务对象具有trail = True,这意味着它将存储其子女。因此,您将能够使用request.Children或使用(异步)结果的孩子属性来获取它。一旦您列出了Child Task_ID列表,撤销这些任务就很微不足道。

请记住,在某些情况下,即使使用terminate=True,芹菜也无法撤销任务,因此您实际上可能需要通过调用revoke(terminate=True, signal='SIGKILL')来发送Sigkill。这不是芹菜中的错误,而是更多/更少取决于任务的性质及其做什么...

相关内容

  • 没有找到相关文章

最新更新