我有一个django应用程序,它使用芹菜来运行任务。
有时,我有一个"硬关机"。还有一堆模型没有清理干净。我创建了一个名为clean_up
的任务,我想在启动时运行它。
tasks.py
from my_app.core import utils
from celery import shared_task
@shared_task
def clean_up():
f_name = clean_up.__name__
utils.clean_up()
celery.py
是这样的:
import os
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_app.settings")
app = Celery("proj")
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django apps.
app.autodiscover_tasks()
app.conf.beat_schedule = {
"runs-on-startup": {
"task": "my_app.core.tasks.clean_up",
"schedule": timedelta(days=1000),
},
}
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")
如何将celery.py
更改为仅在启动时运行clean_up
?
额外信息:
- 这是一个docker组合,所以通过"硬关闭";我是说
docker compose down
- By "on start up"我是说
docker compose up
你在找信号:
tasks.py
添加以下代码:
from celery import signals
@signals.worker_ready.connect
def clean(**kwargs):
...
loggert.info('worker_ready')