我对此进行了大量研究,我很惊讶我还没有在任何地方找到一个好的答案。
我在 Heroku 上运行一个大型应用程序,我有一些芹菜任务运行很长时间处理,并在任务结束时保存结果。每次我在 Heroku 上重新部署时,它都会发送 SIGTERM(最终是 SIGKILL)并杀死我正在运行的工人。 我正在尝试找到一种方法,让 worker 实例正常关闭自身并重新排队以供以后处理,以便最终我们可以保存所需的结果,而不是丢失排队的任务。
我找不到一种方法可以让工人正确收听 SIGTERM。 我得到的最接近的,它在直接运行python manage.py celeryd
时有效,但在使用 foreman 模拟 Heroku 时不起作用,如下:
@app.task(bind=True, max_retries=1)
def slow(self, x):
try:
for x in range(100):
print 'x: ' + unicode(x)
time.sleep(10)
except exceptions.MaxRetriesExceededError:
logger.error('whoa')
except (exceptions.WorkerShutdown, exceptions.WorkerTerminate) as exc:
logger.error(u'retrying, ' + unicode(exc))
raise self.retry(exc=exc, countdown=10)
except (KeyboardInterrupt, SystemExit) as exc:
print 'retrying'
raise self.retry(exc=exc, countdown=10)
else:
return x
finally:
logger.info('task ended!')
当我开始在工头中运行的这个芹菜任务并按 Ctrl+C 时,会发生以下情况:
^CSIGINT received
22:20:59 system | sending SIGTERM to all processes
22:20:59 web.1 | exited with code 0
22:21:04 system | sending SIGKILL to all processes
Killed: 9
所以很明显,没有一个芹菜例外,也没有我在其他帖子中看到的KeyboardInterrupt
或SystemExit
例外,正确地抓住了SIGTERM并关闭了工人。
正确的方法是什么?
从版本>= 4 开始,Celery 附带了一项特殊功能,仅适用于 Heroku,它支持开箱即用的功能:
$ REMAP_SIGTERM=SIGQUIT celery -A proj worker -l info
来源: https://devcenter.heroku.com/articles/celery-heroku#using-remap_sigterm
不幸的是,芹菜的设计不是为了干净关闭。曾。我说真的。芹菜工人响应 SIGTERM,但如果任务未完成,工作进程将等待完成任务,然后才退出。在这种情况下,如果工作人员没有在合理的时间内关闭,您可以将其发送给SIGKILL,但在这种情况下会丢失信息,即您可能不知道哪些作业仍未完成。
您可以使用acks_late或task_acks_late。
任务将在任务完成执行后从队列中确认,而不仅仅是在之前。因此,如果工作线程正常关闭,任务将重新生成。