Django:如何只在启动时运行一个芹菜任务?



我有一个django应用程序,它使用芹菜来运行任务。

有时,我有一个"硬关机"。还有一堆模型没有清理干净。我创建了一个名为clean_up的任务,我想在启动时运行它。

tasks.py

from my_app.core import utils
from celery import shared_task

@shared_task
def clean_up():
f_name = clean_up.__name__

utils.clean_up()

celery.py是这样的:


import os
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_app.settings")
app = Celery("proj")

app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
app.conf.beat_schedule = {
"runs-on-startup": {
"task": "my_app.core.tasks.clean_up",
"schedule": timedelta(days=1000),
},
}

@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")

如何将celery.py更改为仅在启动时运行clean_up?

额外信息:

  • 这是一个docker组合,所以通过"硬关闭";我是说docker compose down
  • By "on start up"我是说docker compose up

你在找信号:

tasks.py添加以下代码:

from celery import signals
@signals.worker_ready.connect
def clean(**kwargs):
...
loggert.info('worker_ready')

最新更新