芹菜生产优雅地重新开始



我需要重新启动芹菜守护程序,但我需要告诉当前的工人在完成任务时关闭,然后在旧工人仍在关闭时旋转一组新的工人。

守护程序上的当前优雅选项会等待所有任务在重新启动之前完成,这在您长期运行的作业时就没有用。

请不要建议自动加载,因为当前在4.0.2中没有记录。

好吧,我最终做的是使用Subersisord,并且可以管理此操作。

[program:celery_worker]
# Max concurrent task you wish to run.
numprocs=5
process_name=%(program_name)s-%(process_num)s
directory=/opt/worker/main
# Ignore this unless you want to use virtualenvs.
environment=PATH="/opt/worker/main/bin:%(ENV_PATH)s"
command=/opt/worker/bin/celery worker -n worker%(process_num)s.%%h --app=python --time-limit=3600 -c 5 -Ofair -l debug --config=celery_config -E
stdout_logfile=/var/log/celery/%(program_name)s-%(process_num)s.log
user=worker_user
autostart=true
autorestart=true
startretries=99999
startsecs=10
stopsignal=TERM
stopwaitsecs=7200
killasgroup=false

您可以使用主管停止/启动工人加载新代码,但是它将等待所有人停止,然后再开始启动它们,这对于长期运行的作业效果不佳。最好只是称为主过程,这会告诉工人在完成工作后停止接收工作。

ps aux | grep *celery.*MainProcess | awk '{print $2}' | xargs kill -TERM

主管将在死亡时重新启动。

当然,在没有完全停止所有工人的情况下更新依赖性是不可能的,这是使用Docker之类的东西的一个很好的理由。;)

在芹菜4上,我必须修补基本任务类才能使其正常工作。来源

import signal
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('my_logger')
class MyBaseTask(Task):
    def __call__(self, *args, **kwargs):
        signal.signal(signal.SIGTERM,
                      lambda signum, frame: logger.info('SIGTERM received, 
                                            wait till the task finished'))
        return super().__call__(*args, **kwargs)
app = Celery('my_app')
app.Task = MyBaseTask

还有一个补丁可防止在警告关闭时重新安排

我们的任务最多可能需要48小时。当我们有新版本,并且我们将新版本部署到生产中时,您谈论的优雅重新开始非常普遍。我们要做的就是简单地将Sigterm(关闭)信号发送给跑步的工人,然后并行旋转全新的工人。

相关内容

  • 没有找到相关文章

最新更新