我正在用芹菜做一项长期任务。该任务将使用subprocess.Popen
创建一个子流程。为了使任务可中止,我编写了以下代码:
from celery.contrib import abortable
@task(bind=True, base=abortable.AbortableTask)
def my_task(self, *args):
p = subprocess.Popen([...])
while True:
try:
p.wait(1)
except subprocess.TimeoutExpired:
if self.is_aborted():
p.terminate()
return
else:
break
# Other codes...
我在控制台中尝试了一下,效果很好。但是当我决定通过按Ctrl+C
关闭工人时,程序会打印出'worker: Warm shutdown (MainProcess)'
并长时间阻塞,这不是我所期望的。似乎任务堕胎不会在工人即将关闭时发生。
从文档中我知道,如果我想中止任务,我应该使用任务 id 手动实例化AbortableAsyncResult
并调用其.abort()
方法。但是我找不到放置此代码的位置,因为它需要所有正在运行的任务的 id,而我无法访问这些任务。
那么,当工作人员即将关闭时,如何为所有正在运行的任务调用.abort()
呢?还是有其他选择?
我正在使用芹菜 4.1.0 和 python 3.6.2。
您可以使用工作线程信号来实现此目的。只需获取所有正在运行的任务并对其调用 .abort((。
受到@Ishaan答案的启发,我自己解决它,使用如下代码:
def my_task(*args):
p = None
from celery.platforms import signals
def int_handler(signum, frame):
if p is not None:
p.kill()
p.wait()
signals['INT'] = int_handler
p = subprocess.Popen([...])
p.wait()
# Other codes...
该解决方案基于以下注意事项:
- 任务的内部范围是我能找到的每个工作人员执行的唯一位置。
- 在任务中,我可以轻松访问创建的子流程。
- SIGINT 信号不由芹菜工作人员处理,因此它不会覆盖芹菜的默认行为。 一个
- 工作人员一次将运行一个任务,因此这种注册是安全的。