如何使用Django在Celery中正确创建任务



我请求帮助完成这项任务。有一个通知模型。我想创建一个异步任务来创建通知。但是我得到一个错误,类型为MPTModelBase的Object是不可JSON序列化的。型号.py

class Comment(MPTTModel):
"""Модель комментариев"""
content_type = models.ForeignKey(ContentType, verbose_name=_('Тип контента'), related_name='content_ct_%(class)s', on_delete=models.CASCADE)
object_id = models.PositiveIntegerField(verbose_name=_('ID контента'), db_index=True)
content_object = GenericForeignKey('content_type', 'object_id')
"""Род.коммент"""
parent = TreeForeignKey('self', on_delete=models.CASCADE, verbose_name=_('Родительский комментарий'), blank=True,
null=True, related_name='children')
"""Инфо, привязка, модерация"""
content = models.TextField(verbose_name=_('Комментарий'))
created_by = models.ForeignKey(AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='comment_created_by', verbose_name=_('Автор комментария'))
is_published = models.BooleanField(verbose_name=_('Опубликовать'), default=True)
time_create = models.DateTimeField(auto_now_add=True, verbose_name=_('Дата добавления'))
"""Generic FK"""
rating = GenericRelation('Rating', related_query_name='%(class)s')
notification = GenericRelation('Notification', related_query_name='%(class)s')

def save(self, *args, **kwargs):
send_create_notification.delay(self, 3)
super().save(*args, **kwargs)

服务.py

def create_notification(instance, type):
"""Notification create"""
from modules.core.models import Notification
model_object = instance
obj = model_object.content_object
try:
text = model_object.content[0:120]
except:
text = None
try:
to_user = obj.created_by
except:
to_user = obj
from_user = model_object.created_by
now = timezone.now()
last_minute = now - datetime.timedelta(seconds=60)
similar_actions = Notification.objects.filter(from_user=from_user, to_user=from_user, type=type, time_create__gte=last_minute)
if obj:
from django.contrib.contenttypes.models import ContentType
content_type = ContentType.objects.get_for_model(obj)
similar_actions = similar_actions.filter(content_type=content_type, object_id=obj.id)
if not similar_actions:
if text:
notification = Notification(from_user=from_user, to_user=to_user, type=type, content_object=obj, text=text)
else:
notification = Notification(from_user=from_user, to_user=to_user, type=type, content_object=obj)
notification.save()
return True
return False

tasks.py

@shared_task()
def send_create_notification(self, type):
return create_notification(self, type)

send_create_notification.delay(self, 3)将尝试序列化Comment实例,该实例不可JSON序列化。您可以使用pickle序列化程序,但不建议使用。

相反,我建议您将注释id作为参数发送到任务:

send_create_notification.delay(self.pk, 3)

并获取任务上的实例

@shared_task()
def send_create_notification(comment_id, type):
comment = Comment.objects.get(pk=comment_id)
return create_notification(comment, type)

相关内容

  • 没有找到相关文章

最新更新