我正在尝试将post post请求通话给我们的API芹菜,因为我们将尽快将最多10个请求发送到我们的API,每个请求都将有在我们的数据库中创建的100多个对象。我认为我会将它们添加到队列中,然后让redis 芹菜处理它,然后从那里起作用。
我遇到了一些问题。
首先,我的芹菜设置:
########## CELERY
import djcelery
djcelery.setup_loader()
INSTALLED_APPS += ['expert.taskapp.celery.CeleryConfig']
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://127.0.0.1:6379')
if CELERY_BROKER_URL == 'django://':
CELERY_RESULT_BACKEND = 'redis://'
else:
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
########## END CELERY
在我的django休息框架中使用class clast视图,这是我到目前为止的视图:
from celery import shared_task
from celery.decorators import task
from .tasks import create
class DataCreateAPIView(CreateAPIView):
def create(self, request, *args, **kwargs):
create.delay(request)
因此,这个想法是让创建视图将所有操作都付诸实践,直到我到达过程的创建部分为止,我立即将创建任务从芹菜中卸载。
在我的任务中。
from celery import shared_task
from celery.decorators import task
from expert.models import Chamber, Parameter, Sensor, Data
@task(name='POST request Data point.')
def create(self, request, *args, **kwargs):
queryset = Data.objects.all()
queryset = DataCreateSerializer.setup_eager_loading(queryset)
# serializer_class = DataCreateSerializer
try:
sensor = Sensor.objects.get(serial_number=request.data["data_source"])
request.data["data_source"] = sensor.id
except Sensor.DoesNotExist:
print("Sensor serial number " + str(request.data["data_source"]) + " not registered.")
return Response(status=status.HTTP_404_NOT_FOUND)
dataDict = dict(request.data)
for param in dataDict['parameters']:
Parameter.objects.get_or_create(parameter_name=param, parameter_position="None")
final_data = []
for data in dataDict['data_array']:
zipped = zip(dataDict['parameters'], data['values'])
for parameter, value in zipped:
# parameter = Parameter.objects.get_or_create(parameter_name=parameter, parameter_position="None")[0]
parameter = Parameter.objects.get(parameter_name=parameter)
final_data.append({
"sensor": sensor.id,
"parameter": parameter.id,
"time": data['time'],
"parameter_value": value
})
serializer = DataCreateSerializer(data=final_data, many=True)
if serializer.is_valid():
serializer.save()
return Response(status=status.HTTP_200_OK)
return Response(serializer.errors)
我在大量请求中采取的位置,对其进行一些修改以适合我们的模式,然后进行数据库写作。
现在,如果我只采用相同的"创建"函数,然后将其直接放在CreateApiview。
当我尝试使用芹菜进行操作时,初始化芹菜工人时会在任务列表中显示任务,但是我无法获得接触芹菜的请求。在此之前,我闯入以下错误:
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/exception.py" in inner
42. response = get_response(request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/base.py" in _get_response
187. response = self.process_exception_by_middleware(e, request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/core/handlers/base.py" in _get_response
185. response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/usr/lib/python3.5/contextlib.py" in inner
30. return func(*args, **kwds)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/views/decorators/csrf.py" in wrapped_view
58. return view_func(*args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/django/views/generic/base.py" in view
68. return self.dispatch(request, *args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in dispatch
489. response = self.handle_exception(exc)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in handle_exception
449. self.raise_uncaught_exception(exc)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/views.py" in dispatch
486. response = handler(request, *args, **kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/rest_framework/generics.py" in post
192. return self.create(request, *args, **kwargs)
File "/home/luke/Projects/expert/impedans_expert/impedans_expert/expert/api/views.py" in create
249. create.delay(request)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/task.py" in delay
461. return self.apply_async(args, kwargs)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/task.py" in apply_async
573. **dict(self._get_exec_options(), **options)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/base.py" in send_task
354. reply_to=reply_to or self.oid, **options
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/celery/app/amqp.py" in publish_task
310. **kwargs
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/messaging.py" in publish
165. compression, headers)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/messaging.py" in _prepare
241. body) = dumps(body, serializer=serializer)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in dumps
164. payload = encoder(data)
File "/usr/lib/python3.5/contextlib.py" in __exit__
77. self.gen.throw(type, value, traceback)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in _reraise_errors
59. reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/five.py" in reraise
131. raise value.with_traceback(tb)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in _reraise_errors
55. yield
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in dumps
164. payload = encoder(data)
File "/home/luke/.virtualenvs/expert/lib/python3.5/site-packages/kombu/serialization.py" in pickle_dumps
356. return dumper(obj, protocol=pickle_protocol)
Exception Type: EncodeError at /expert/api/data/create/
Exception Value: cannot serialize '_io.BufferedReader' object
老实说,我必须知道如何从那里去。我已经尝试搜索谷歌例外,但是即使我看到了一个页面看起来相似的页面,我也无法理解该线程。
我将非常感谢在此问题上的任何帮助。
基于功能的芹菜任务要求传递给它的参数可序列化。您会遇到一个错误,这意味着有一些无法序列化的论点。
首先,您需要修复create
功能签名,因为它不应将self
作为参数:
@task(name='POST request Data point.')
def create(request):
...
让我知道这是否有效,因为我正在研究类似的解决方案。
这很难不可调试,但无论如何我都会从更改dataCreateApiview.create。tasks.py