使用 RabbitMQ 更新芹菜中的任务



我在我的django项目中使用Celery来创建任务,以便在将来的特定时间发送电子邮件。用户可以创建具有notify_on日期时间字段的通知实例。然后我将notify_on的值作为eta传递。

class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()

def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance,), eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)

这种方法的问题在于,如果用户更改notify_on,他将收到两个(或更多(通知,而不是一个。

问题是如何更新与特定通知关联的任务,或者以某种方式删除旧通知并创建新通知。

首先,通过使用 post_save ,我们无法获取旧数据。因此,在这里我重写了Notification模型的save()方法。除此之外,创建一个字段来存储芹菜task_id。

from celery.task.control import revoke

class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()
    celery_task_id = models.CharField(max_length=100)
    def save(self, *args, **kwargs):
        pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
        super().save(*args, **kwargs)
        post_notify_on = self.notify_on
        if not self.celery_task_id:  # initial task creation
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
        elif pre_notify_on != post_notify_on:
            # revoke the old task
            revoke(self.celery_task_id, terminate=True)
            task_object = send_notification.apply_async((self,), eta=self.notify_on)
            Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)

参考

  1. 取消已经使用芹菜执行的任务?
  2. Django:如何在信号中访问原始(未修改post_save实例
  3. (

我认为没有必要删除以前的任务。您只需要验证正在执行的任务是否是最后一个任务。为此,请创建一个名为校验和的新字段,该字段是每次更改notify_on时更新该字段的 UUID 字段。在发送电子邮件的任务中检查此校验和。

class Notification(models.Model):
    checksum = models.UUIDField(default=uuid.uuid4)
    notify_on = models.DateTimeField()
def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance.id, str(instance.checksum)),eta=instance.notify_on)
signals.post_save.connect(notification_post_save, sender=Notification)

@shared_task 
def send_notification(notification_id, checksum):
    notification = Notification.objects.get(id=notification_id)
    if str(notification.checksum) != checksum:
        return False
    #send email

另外,请不要每次在通知对象上发送信号,只需在notify_on更改时发送即可。你也可以检查这个识别 django post_save信号中更改的字段

相关内容

  • 没有找到相关文章

最新更新