我整天都在与这项任务作斗争。
我有一个Django应用程序。我使用芹菜来完成异步任务。有时,我想创建一个定期任务。任务运行未知但稍后需要删除的次数。所以任务可能是这样的:
@shared_task
def foobar_task(id):
if this_should_run:
do_task()
else:
PeriodicTask.objects.get(name='{} task'.format(id)).delete()
我的应用正在运行。我在 Docker 容器中运行芹菜节拍,使用celery --app=myproject beat --loglevel=info --scheduler=django
运行。我有另一个运行标准芹菜工人的容器。
所以现在我想动态创建我的定期任务。我有一个视图/API 端点,它会触发如下内容:
schedule, _ = IntervalSchedule.objects.get_or_create(every=15, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(interval=schedule,
name='{} task'.format(id),
task='myapp.tasks.foobar_task')
在 Django 管理中,我可以看到定期任务已创建。但是,看着芹菜容器和芹菜节拍容器的日志,没有任何反应。
为什么芹菜节拍没有发现有新的周期性任务?我不想每次创建或删除新任务时都必须重新启动芹菜节拍。
注意:我正在使用Django 1.11.2,PostgreSQL,Celery 4.0.2,Django Celery Beat 1.0.1。
您可以创建一个自定义调度程序,如下所示,改编自此答案。
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = [x for x in new_schedule.keys() if x not in self.schedule.keys()]
to_remove = [x for x in self.schedule.keys() if x not in new_schedule.keys()]
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
运行芹菜节拍时,请将其指向以下类:
celery --app=myproject beat --loglevel=info --scheduler=myproject.scheduler.AutoUpdateScheduler