处理芹菜中的"post_save"信号



我有一个运行时间相当长的任务,需要在插入或更新特定模型后执行。

我决定使用post_save信号而不是覆盖save方法来减少耦合。由于 Django 信号不是异步的,我不得不将长时间运行的工作作为 Celery 任务(我们已经在我们的堆栈中拥有(。

我的信号处理函数的简化版本如下:

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
handle_save_task.apply_async(args=(instance.pk,))

此外,由于作业是异步完成的,因此我传递了对象的主键而不是实例本身。

@app.task(queue='elastic')
def handle_save_task(instance_pk):
try:
instance = MyModel.objects.get(pk=instance_pk)
except ObjectDoesNotExist:
# Abort
logger.warning("Saved object was deleted before this task get a chance to be executed [id = %d]" % instance_pk)
else:
# Do my things with instance

实际问题是,当执行芹菜任务时,它无法访问新保存的实例。就像在保存之前执行一样!(信号不是叫post_保存吗?多么讽刺(

通过"保存前执行",我的意思是如果它是一个插入数据库的新实例,在芹菜任务中我得到一个DoesNotExist异常,在实例已经在数据库中并且调用 save 方法更新其某些属性的情况下,我在芹菜任务中获取具有旧属性值的旧实例。

解决方法是在几秒钟的延迟下运行芹菜任务,但显然这不是一个好的解决方案,也不能保证在重负载或长时间网络延迟下的正确执行行为。

我是否完全错了,或者只需稍作修改即可使其工作?

这可能是由于在事务中执行更新引起的。事务是在芹菜任务启动后提交的,这会导致芹菜任务在运行时看到您的旧值。

您可以尝试以下更改:

from django.db import transaction
@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
transaction.on_commit(lambda: handle_save_task.apply_async(args=(instance.pk,)))

最新更新