上下文
我已经创建了一个django应用程序,该应用程序调用了芹菜任务,该任务依次产生其他任务并等待它们完成。
这是工作流程:
1)主python/django代码在后台启动芹菜任务
2)芹菜任务过程一些代码,然后启动芹菜组的一组不同的任务,然后等待它们准备就绪
3)小组的每个任务然后以相同的方式产生另一组子任务,然后等待它们完成
它运行良好(尽管我是乞eg,并且可能实现了很差),但是现在我希望能够终止每个孩子的过程,如果我杀死了开始的主要芹菜任务。
。我到目前为止拥有的
我已经使用一个简单的父任务来重新创建了这种情况,这些任务已产生多个子任务,并且我修改了芹菜任务类的" on_failure"方法,以杀死它的孩子。
tasks.py
from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')
class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
print('{0!r} failed: {1!r}'.format(task_id, exc))
@application.task(base=MyTask)
def childTask():
while True:
time.sleep(10)
print("Message de la tache enfant")
continue
@application.task(base=MyTask)
def parentTask(pra_id = None):
child_tasks = []
print("Lancement tache mère")
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
child_tasks.append(childTask.s())
tasks = group(child_tasks)
tasks.apply_async()
time.sleep(15)
raise KeyError
main.py
from tasks import parentTask
parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)
当代码引发错误时,父任务也会成功杀死,其子任务也是我想要的。
我需要
我需要能够从我的Django应用程序中手动杀死我的父任务。
这是通过检查芹菜工人并通过搜索其论点来找到我的任务来完成的,但是,当我找到芹菜任务时,这是成功完成的,它不会终止由此产生的孩子任务任务,这就是我需要的。
我到目前为止尝试了
我尝试创建一个由"撤消"信号触发的函数
(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)
将在撤销任务时执行。
捕获工作信号(在撤销任务时我能够执行一些代码),但是我无法使用与上述" on_failure"方法相同的代码来杀死Childs任务。
问题
发送到该函数的请求对象确实包含我的父任务,但是当该类包含包含子任务的groupresult对象时,类的"子"属性为空。
不确定这是否对您有帮助,但是我发现的工作有点好,可以将每个子任务存储在Redis或某些数据库中,并将其与某些数据库相关联pipeline_id。然后,如果我需要杀死父任务,我也可以杀死所有存储在列表中的子任务。
result.revoke(terminate=True)
subtask_results = get_subtask_status(pipeline_id) #Custom Function
for subtask_result in subtask_results:
subtask_result.revoke(terminate=True)
默认情况下,芹菜任务对象具有trail = True
,这意味着它将存储其子女。因此,您将能够使用request.Children或使用(异步)结果的孩子属性来获取它。一旦您列出了Child Task_ID列表,撤销这些任务就很微不足道。
请记住,在某些情况下,即使使用terminate=True
,芹菜也无法撤销任务,因此您实际上可能需要通过调用revoke(terminate=True, signal='SIGKILL')
来发送Sigkill。这不是芹菜中的错误,而是更多/更少取决于任务的性质及其做什么...