我正在使用芹菜和Django创建一个periodic_task,我希望每X秒运行一次。。该任务应该产生几个子任务,但我需要确保每个主任务只产生一组子任务。
这就是我的。。
@periodic_task(run_every=datetime.timedelta(seconds=2))
def initialize_new_jobs():
for obj in Queue.objects.filter(status__in=['I', 'Q']):
obj = Queue.objects.get(id=obj.id)
if obj.status not in ['I', 'Q']:
continue
obj.status = 'A'
obj.save()
create_other_task.delay(obj.id)
这有点奏效,但感觉不对。我必须在循环开始时刷新obj,以确保另一个正在运行的periodic_task不会对同一Queue对象发出create_other_task。
有没有更好的方法来做这种工作?基本上,我想尽可能频繁地执行create_other_task,但对于状态为I或Q的每个Queue对象,只执行一次。
这是我的问题的缩短版本,所以请忽略这样一个事实,即我可以在创建Queue对象时运行create_other_task,而不是运行周期性任务:)
您可以使用事务:
@periodic_task(run_every=datetime.timedelta(seconds=2))
@transaction.commit_on_success
def initialize_new_jobs():
for obj in Queue.objects.select_for_update().filter(status__in=['I', 'Q']):
obj.status = 'A'
obj.save()
create_other_task.delay(obj.id)
select_for_update()
对行设置独占锁定,以便其他用户在尝试读取值时被阻止。在事务被提交或回滚之后,锁被释放。参考
通过这种方式,您可以确保obj
的状态为I
或Q
,并且obj.save()
将正常工作。