将我的queryset obj作为参数传递时,Celery引发错误



我试图执行一个周期性任务,所以我使用了带有Django 1.8和Django Rest Framework以及Postgres作为数据库的芹菜。当我尝试将我的obj发送到任务时,我得到TypeError: foreign_model_obj is not JSON serializable。如何将queryset对象传递给Task。

views.py:

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)
    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, {"id": obj.id})
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj], eta=local_time)
        return Response(result)

任务.py:

@celery_app.task(name="my_celery_task")
def my_first_celery_task(mymodel_obj):
    # ... updating obj attributes
    mymodel_obj.save()

您只需要发送实例的id并在任务中检索对象。传递实例是一种糟糕的做法,因为它可以在此期间更改,特别是当你用deplay执行任务时。

views.py:

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)
    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])
        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, {"id": obj.id})
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj.id], eta=local_time) # send only the obj id
        return Response(result)

任务.py:

@celery_app.task(name="my_celery_task")
def my_celery_task(mymodel_obj_id):
    my_model_obj = MyModel.objects.get(id=mymodel_obj_id) # retrieve your object here
    # ... updating obj attributes
    mymodel_obj.save()

实际上,IMHO最好的方法是获取查询集的可拾取组件,然后在任务中重新生成查询集(https://docs.djangoproject.com/en/1.9/ref/models/querysets/):

import pickle
query = pickle.loads(s)     # Assuming 's' is the pickled string.
qs = MyModel.objects.filter(a__in=[1,2,3]) # whatever you want here...
querystr = pickle.dumps(qs.query)      # pickle the queryset
my_celery_task.apply_async(querystr, eta=local_time) # send only the string...

任务:

@celery_app.task(name="my_celery_task")
def my_celery_task(querystr):
    my_model_objs = MyModel.objects.all()
    my_model_objs.query = pickle.loads(querystr) # Restore the queryset
    # ... updating obj attributes
    item=my_model_objs[0]

我认为,这是最好的方法,因为查询将在任务中执行(可能是第一次),以防止各种定时问题,它不需要在调用方中执行(因此不需要加倍执行查询)。

您可以将序列化方法更改为pickle,但不建议将queryset作为参数传递。引用Celery文档:

另一个难题是Django模型对象。它们不应该作为参数传递给任务。当任务运行时,从数据库中重新获取对象几乎总是更好的,因为使用旧数据可能会导致竞争条件。

http://docs.celeryproject.org/en/latest/userguide/tasks.html

最新更新