我发现我可以从这里将任务设置为在特定时间以特定间隔运行,但这只能在任务声明期间完成。如何设置任务定期动态运行?
调度派生自一个设置,因此在运行时似乎是不可变的。
你可能可以完成你正在寻找使用任务eta。这保证了您的任务不会在期望的时间之前运行,但不保证在指定的时间运行任务-如果工作者在指定的ETA上过载,任务可能会延迟运行。
如果这个限制不是问题,你可以写一个任务,它首先会自己运行:
@task
def mytask():
keep_running = # Boolean, should the task keep running?
if keep_running:
run_again = # calculate when to run again
mytask.apply_async(eta=run_again)
# ... do the stuff you came here to do ...
这种方法的主要缺点是你依赖于taskstore来记住正在运行的任务。如果其中一个在启动下一个之前失败,那么该任务将永远不会再次运行。如果您的代理没有持久化到磁盘,并且它死亡(带走所有正在运行的任务),那么这些任务都不会再次运行。
您可以通过某种事务日志记录和周期性的"保姆"任务来解决这些问题,该任务的工作是查找过早死亡的重复任务并使其恢复。
如果我必须实现你所描述的,我想我会这样做。
celery.task.base.PeriodicTask
定义is_due
,它决定下一次运行的时间。您可以覆盖此函数以包含自定义的动态运行逻辑。查看这里的文档:http://docs.celeryproject.org/en/latest/reference/celery.task.base.html?highlight=is_due#celery.task.base.PeriodicTask.is_due
一个例子:
import random
from celery.task import PeriodicTask
class MyTask(PeriodicTask):
def run(self, **kwargs):
logger = self.get_logger(**kwargs)
logger.info("Running my task")
def is_due(self, last_run_at):
# Add your logic for when to run. Mine is random
if random.random() < 0.5:
# Run now and ask again in a minute
return (True, 60)
else:
# Don't run now but run in 10 secs
return (True, 10)
见这里http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
我认为你不能使它动态…最好的方法是在task中创建task:D
举个例子,你想在X秒后运行一些东西,然后你创建一个X秒延迟的新任务,并在这个任务中创建另一个N*X秒延迟的任务…
这应该会帮助你…http://celery.readthedocs.org/en/latest/faq.html can-i-change-the-interval-of-a-periodic-task-at-runtime
一旦你定义了一个自定义的时间表,就像asksol上面建议的那样把它分配给你的任务。
CELERYBEAT_SCHEDULE = {
"my_name": {
"task": "myapp.tasks.task",
"schedule": myschedule(),
}
}
如果您希望您的日程安排比每五分钟更新更频繁,您可能还需要修改CELERYBEAT_MAX_LOOP_INTERVAL。