在我的Django项目中,我使用Celery和Rabbitmq在后台运行任务。我正在使用芹菜节拍调度程序来运行定期任务。如何以编程方式检查芹菜节拍是否启动并正在运行?
Ping URL 发出 HTTP 请求的任务。当 URL 未按时 ping 时,URL 监视器将向您发送警报。
import requests
from yourapp.celery_config import app
@app.task
def ping():
print '[healthcheck] pinging alive status...'
# healthchecks.io works for me:
requests.post("https://hchk.io/6466681c-7708-4423-adf0-XXXXXXXXX")
这个芹菜周期性任务计划每分钟运行一次,如果它没有点击ping,你的节拍服务关闭*,监视器将启动你的邮件(或webhook,这样你就可以zapier它来获取移动推送通知(。
celery -A yourapp.celery_config beat -S djcelery.schedulers.DatabaseScheduler
*或不知所措,您应该跟踪任务饱和度,这是 Celery 的噩梦,应该正确检测和解决,当工人忙于阻止需要优化的任务时经常发生
如果您按照芹菜文档的教程守护了芹菜,则可以通过以下方式检查它是否正在运行
sudo /etc/init.d/celeryd status
sudo /etc/init.d/celerybeat status
您可以在 python 模块中使用此类命令的返回。
您是否使用暴发户或监督或其他东西来运行芹菜工人 + 芹菜节拍作为后台任务?在生产中,您应该使用其中一个在后台运行芹菜工人 + 芹菜节拍。
检查芹菜节拍的最简单方法是运行:ps aux | grep -i '[c]elerybeat'
。如果您收到包含pid
的文本字符串,则它正在运行。你也可以使这个命令的输出更漂亮:ps aux | grep -i '[c]elerybeat' | awk '{print $2}'
。如果你得到数字 - 它正在工作,如果你什么也没得到 - 它不起作用。
您还可以检查芹菜工人状态:celery -A projectname status
。
如果您有兴趣进行高级芹菜监测,则可以阅读官方文档监控指南。
您可能可以查找主管。它提供了一个芹菜节拍 conf,记录与 /var/log/celery/beat.log
节拍相关的所有内容。
另一种方法是使用Flower。您可以为您的服务器设置它(确保其密码受保护(,在 GUI 中更容易注意到正在排队的任务以及它们排队的时间,从而验证您的节拍是否运行良好。
我最近针对同一问题使用了类似于@panchicore建议的解决方案。
我工作场所的问题是使用芹菜节拍的重要系统,偶尔,要么是由于 RabbitMQ 中断,要么是由于我们的服务器和 RabbitMQ 服务器之间的一些连接问题,因此芹菜节拍不再触发 crons,除非重新启动。
由于我们手边没有任何工具来监控通过HTTP发送keep alive
调用,因此我们将statsd用于相同的目的。statsd 服务器上每分钟都有一个计数器递增(由芹菜任务完成(,然后我们在 grafana 指标上设置电子邮件和 Slack 频道警报。 (no updates for 10 minutes == outage)
我知道这不是纯粹的编程方法,但是如果没有单独的监视实体,任何生产级别的监视/警报都是不完整的。
编程部分非常简单。每分钟运行一个小芹菜任务。
@periodic_task(run_every=timedelta(minutes=1))
def update_keep_alive(self):
logger.info("running keep alive task")
statsd.incr(statsd_tags.CELERY_BEAT_ALIVE)
我在这种方法中遇到的一个问题是由于 UDP 上的 STATSD 数据包丢失。因此,如果可能的话,请使用与 STATSD 的 TCP 连接来实现此目的。
您可以通过以下命令检查调度程序是否正在运行
python manage.py celery worker --beat
在最近做一个项目时,我使用了这个:
健康检查 CMD ["stat celerybeat.pid || exit 1"]
本质上,beat 进程在某个位置(通常是主位置(下写入一个 pid 文件,您所要做的就是获取一些统计信息来检查文件是否存在。
注意:这在 Docker 容器中启动独立的芹菜测试版进程时有效
的活动目标是检查芹菜节拍/调度程序是否能够将作业发送到消息代理,以便相应的使用者可以拾取它。[它是否仍在工作或处于挂起状态]。芹菜工作器和芹菜调度程序/节拍可能在同一 Pod 或实例中运行,也可能不运行。
为了处理这种情况,我们可以创建一个带有装饰器@after_task_publish.connect
update_scheduler_liveness
的方法,每次调度程序成功将消息/任务发布到消息代理时都会调用该方法。该方法 update_scheduler_liveness
将在每次成功发布任务时将当前时间戳更新为文件。在 Liveness 探测中,我们需要使用以下方法检查文件的上次更新时间戳: stat --printf="%Y" celery_beat_schedule_liveness.stat
命令或者我们可以显式尝试读取文件(读取模式(并提取时间戳,并根据活动探测标准比较时间戳是否是最近的。在这种方法中,您需要的分钟活度标准越多,必须从芹菜节拍触发作业的频率就越高。因此,对于那些作业之间的频率非常大的情况,可以每 2-5 分钟安排一次自定义/专用活动心跳作业,使用者可以处理它。 @after_task_publish.connect
装饰器提供了多个参数,这些参数也可用于过滤触发的活动特定作业
如果我们不想采用基于文件的方法,那么我们可以依赖 Redis 之类的数据源,以及特定于实例的 redis 键,这需要在同一行上实现。