如何检查Celery/Supervisor是否正在使用Python运行



如何用Python编写一个脚本,在计算机(Ubuntu)上运行芹菜时输出?

我的用例。我有一个简单的python文件,里面有一些任务。我没有用Django或Flask。我使用supervisor来运行任务队列。例如,

任务.py

from celery import Celery, task
app = Celery('tasks')
@app.task()
def add_together(a, b):
    return a + b

主管:

[program:celery_worker]
directory = /var/app/
command=celery -A tasks worker info

这一切都起作用了,我现在想要一个页面来检查芹菜/主管进程是否正在运行。也就是说,像这样的东西可能使用Flask允许我托管页面,给出200状态允许我进行负载平衡。

例如。。。

check_status.py

from flask import Flask
app = Flask(__name__)
@app.route('/')
def status_check():
    #check supervisor is running
    if supervisor:
         return render_template('up.html')
    else:
        return render_template('down.html')
if __name__ == '__main__':
    app.run()

2020年9月更新:Jérôme在这里更新了Celery 4.3的答案:https://stackoverflow.com/a/57628025/1159735

您可以通过导入celery.bin.celery包的代码运行celery status命令:

import celery
import celery.bin.base
import celery.bin.celery
import celery.platforms
app = celery.Celery('tasks', broker='redis://')
status = celery.bin.celery.CeleryCommand.commands['status']()
status.app = status.get_app()
def celery_is_up():
    try:
        status.run()
        return True
    except celery.bin.base.Error as e:
        if e.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise e
if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

使用子流程怎么样,不确定这是否是个好主意:

>>> import subprocess
>>> output = subprocess.check_output('ps aux'.split())
>>> 'supervisord' in output
True

您可以从supervisorctl status输出解析进程状态

import subprocess
def is_celery_worker_running():
    ctl_output = subprocess.check_output('supervisorctl status celery_worker'.split()).strip()
    if ctl_output == 'unix:///var/run/supervisor.sock no such file':
        # supervisord not running
        return False
    elif ctl_output == 'No such process celery_worker':
        return False
    else:
        state = ctl_output.split()[1]
        return state == 'RUNNING'

灵感来自@vgel的答案,使用Celery 4.3.0。

import celery
import celery.bin.base
import celery.bin.control
import celery.platforms
# Importing Celery app from my own application
from my_app.celery import app as celery_app

def celery_running():
    """Test Celery server is available
    Inspired by https://stackoverflow.com/a/33545849
    """
    status = celery.bin.control.status(celery_app)
    try:
        status.run()
        return True
    except celery.bin.base.Error as exc:
        if exc.status == celery.platforms.EX_UNAVAILABLE:
            return False
        raise

if __name__ == '__main__':
    if celery_is_up():
        print('Celery up!')
    else:
        print('Celery not responding...')

稀疏的web用户界面带有supervisor。也许你可以用它。它可以在supervisor配置中启用。要查找的密钥是[inet_http_server]

您甚至可以查看这篇文章的源代码,以获得实现自己的想法。

这不适用于芹菜,但对于那些最终来到这里查看supervisord是否正在运行的人,请检查supervisord.conf配置文件中为supervisord定义的pidfile是否存在。如果是,它正在运行;如果不是,那就不是。默认的pidfile是/tmp/susupervisory.pid,这就是我在下面使用的文件。

import os
import sys
if os.path.isfile("/tmp/supervisord.pid"):
    print "supervisord is running."
    sys.exit()

根据我的经验,我会设置一条消息来跟踪它是否完成,以便队列负责重试任务。

相关内容

  • 没有找到相关文章

最新更新