姜戈芹菜周期性任务示例



我需要最少的例子来执行周期性任务(每 5 分钟运行一次某个函数,或者在 12:00:00 等运行一些函数(。

在我的myapp/tasks.py,我有,

from celery.task.schedules import crontab
from celery.decorators import periodic_task
from celery import task

@periodic_task(run_every=(crontab(hour="*", minute=1)), name="run_every_1_minutes", ignore_result=True)
def return_5():
return 5

@task
def test():
return "test"

当我运行芹菜工人时,它确实显示了任务(如下所示(,但不返回任何值(在终端或花朵中(。

[tasks]
. mathematica.core.tasks.test
. run_every_1_minutes

请提供最少的示例或提示,以达到预期的结果。

背景:

我有一个包含以下内容的config/celery.py

import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.local")
app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

在我的config/__init__.py,我有

from .celery import app as celery_app
__all__ = ['celery_app']

我在myapp/tasks.py中添加了一个如下所示的函数

from celery import task
@task
def test():
return "test"

当我从 shell 运行test.delay()时,它运行成功,并且还以花朵显示任务信息

要运行定期任务,您还应该运行芹菜节拍。您可以使用以下命令运行它:

celery -A proj beat

或者,如果您使用一个辅助角色:

celery -A proj worker -B

相关内容

  • 没有找到相关文章

最新更新