我在Django视图中执行了几个芹菜任务(更具体地说是在Django Rest Framework的perform_create
方法中)。
我要实现的目标是(即,一旦任务具有ID/在结果后端)访问TaskResult
对象并使用它做点什么,例如:
tasks = [do_something.s(a) for a in (1, 2, 3, 4,)]
results = group(*tasks).apply_async()
for result in results.children:
task = TaskResult.objects.get(task_id=result.task_id)
do_something_with_task_object(task)
现在,django_celery_results.models.DoesNotExist: TaskResult matching query does not exist
失败。
我还没有尝试,但是我可以使用以下片段进行此操作。但这使我感到不错且丑陋,还要等到任务完成:
:while not all([TaskResult.objects.filter(task_id=t.task_id).exists() for t in results.children]):
pass
有什么方法可以以一种良好而干净的方式使这项工作?
事实证明,a)当您在stackoverflow上提出问题时,您就可以自己回答,b)django交易管理可以完成所需的一切。
如果您在atomic
包装中包装task.apply_async
的呼叫都很好,例如
with transactions.atomic():
results = group(*tasks).apply_async()
TaskResult.objects.get(task_id=results.children[0].task_id)
我不知道它是否适合所有人,但是使用django-celery-results==2.2.0
,作为上下文管理器的交易似乎不再起作用。另一方面,在post_save
信号中,似乎还可以。
# models.py
@receiver(post_save, sender=TaskResult)
def after_task_result(sender, instance, created, **kwargs):
if created: transaction.on_commit(lambda x:do_something())
但是,我在模型创建中未传递的视图中失去了变量。在这种情况下,仍然是丑陋的代码。
# views.py
while not TaskResult.objects.filter(task_id = task.id).exists(): pass
task = TaskResult.objects.get(task_id = task.id)
# do something more complex with local variables