我使用Django和Celery来调度任务。
tasks.py
@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
def hello_world():
print('hello world!)
除了每天运行该函数之外,我还需要能够手动调用它。如果用户按下前端的按钮,它就会运行。我可以做:
@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
def hello_world():
print('hello world!)
@task()
def hello_world():
print('hello world!)
但这与DRY背道而驰。此外,我可能有不止一个函数会有同样的场景。我需要定期运行它,但也需要根据请求运行。
我试过这个:
def hello_world():
print('hello world!)
@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week="*"))
hello_world()
@task
hello_world()
但这并不奏效。我得到
无效语法
我在文档或其他stackoverflow答案中找不到这个场景。他们讨论从shell调用函数。我们将不胜感激。
谢谢!
您不需要定义两次任务,您的周期性任务也可以使用hello_world.delay()
手动调用,因此您将具有:
@periodic_task(run_every=crontab(hour='00', minute='00', day_of_week='*'))
def hello_world():
print('hello world!')
def on_button_press():
hello_world.delay()