我按照这里的教程来让我的django项目中定义的定期任务正常工作。
这篇文章建议有一个celery.py
文件的形式:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, my_task.s('hello'), name='add every 10')
)
@app.task
def my_task(arg):
print(arg)
这是有效的。现在这很好,但我不想在本地定义我的任务。我的问题是,如何从其他应用程序添加任务?
我创建了一个名为my_proj
的空白项目,它有两个应用程序:my_proj
和app_with_tasks
。上面的celery.py
文件位于my_proj
应用程序目录的根级别,我想从app_with_tasks
的tasks.py
文件中添加定期任务。
我确实在my_proj
设置文件的已安装应用程序中列出了app_with_tasks
,但我仍然无法将任何内容从应用程序导入到另一个应用程序。
我的理解是我应该使用:
from app_with_tasks.tasks import task1
但是CCD_ 11随后将在PyCharm中显示为未解析引用。
我会告诉你我在用什么。也许它能帮助你
my_proj/celery.py
import os
import celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_proj.settings')
app = celery.Celery('app_django')
app.config_from_object('django.conf.settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
然后在app_with_tasks中,添加文件tasks.py
from my_proj.celery import app
from django.apps import apps
@app.task(bind=False)
def your_task(some_arg):
A_Model = apps.get_model('my_proj', 'A_Model')
....
启动芹菜服务器的命令(每次更改任务以重新加载tasks.py文件时重新启动此命令)
/path/to/virtualenv/bin/celery --app=my_proj.celery:app --loglevel=INFO --concurrency=4 -n default_worker worker
调用任务(此处应使用add_periodic_task代码)
from app_with_tasks.tasks import your_task
your_task.apply_async(args=[123], kwargs=None)