我正在用Django编写一个灌溉系统,我使用Celery来处理异步任务。
在我的应用程序中,用户选择他想要激活灌溉的日期和时间,这存储在数据库中,他可以随时更新它们。
我知道我可以安排这样的任务,但我想更新时间。
from celery.task.schedules import crontab
from celery.decorators import periodic_task
@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week="mon"))
def every_monday_morning():
print("This runs every Monday morning at 7:30a.m.")
另一种方法是使用 python-crontab
做了类似的事情,通过有一个运行时表列出我想要运行的内容以及何时可以从 Django 中选择
。然后,定期任务本身被安排为每 30 秒运行一次,并检查此表中请求的任何内容是否准备好执行并调用其中一个工作线程。
编辑:根据格雷厄姆的要求,一个例子
Django 中设置节拍的设置:
CELERY_BEAT_SCHEDULE = {
'Runtime': {
'task': 'Runtime',
'schedule': timedelta(seconds=15),
},
}
调用 Django 中定义的模型的任务(例如,irrigation_requests(,用户将通过前端更新:
def Runtime():
#get all from model in state requested
irrigation_job=Irrigation_Requests.objects.filter(Status__in=['Requested'])
# Process each requested if flag to do just one thing then you could just test irrigation_job exists
for job in irrigation_job:
....
print ("This runs every when requested and depending on job parameters stored in your model can have further scope")
#update requested this could be done in loop above
irrigation_job.update(Status='Processed')
在我看到马克的回答后,我终于恍然大悟。所以我将使用类似于下面的代码来做到这一点:
from celery.task.schedules import crontab
from celery.decorators import periodic_task
@periodic_task(run_every=crontab(minutes="*"))
def run_every_minute():
now = datetime.datetime.now()
tasks = TasksModel.objects.all()
for task in Tasks:
if task.is_now():
fn = find_schedual_task(task.name)
fn()