Stack:
- 姜戈 = "==2.2.2">
- 姜戈-芹菜节拍 = "==1.4.0">
- 芹菜 ="==v4.3.0rc1">
- python_version = "3.7">
我有一个从csv/xls文件导入一些数据并保存da数据的类,这是我的芹菜配置:
CELERY_TASK_ALWAYS_EAGER = False
CELERY_BROKER_URL = config('REDIS_BROKER_URL')
CELERY_RESULT_BACKEND = config('REDIS_RESULT_URL')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
这是我调用我的任务(一些基于类的视图(的地方:
def form_valid(self, form):
if self.request.is_ajax():
form.save()
instance = form.save()
kwargs = {
'corporation_id': self.corporation.id,
'file_id': instance.id,
}
task_id = import_file_task.apply_async(
kwargs=kwargs,
)
instance.tas_id = task_id
instance.save()
return JsonResponse(
{
'form_status': 'Success',
'task_id': str(task_id),
}
)
return super().form_valid(form)
这是我调用我的任务的地方:
@celery_app.task(bind=True)
def import_file_task(_, corporation_id, file_id):
sale_file = SaleFile.objects.get(
id=file_id,
corporation_id=corporation_id,
)
if sale_file.type == PRODUCT_FILE:
error = ProductImporter(
corporation_id=corporation_id,
file_id=file_id,
product_file=sale_file,
).save()
elif sale_file.type == RECEIVABLE_FILE:
error = ReceivableImporter(
corporation_id=corporation_id,
file_id=file_id,
receivable_file=sale_file,
).save()
else:
raise ValueError('File type is not valid')
task = AsyncResult(sale_file.tas_id)
task.info = error
task.status = 'COMPLETED'
这是我尝试轮询任务状态的地方,但出现错误!
class TaskStatus(View):
def get(self, request):
task_id = request.GET.get('_task_id')
task = AsyncResult(task_id)
print(task)
print(task.state) #HERE IS THE ERROR
print(dir(task)) #THE STATUS APPEAR HERE
success_response = (
{
'status': ['state: '
],
'result': {
'success': True,
}
}
)
return (
JsonResponse(success_response)
)
这是我第一次使用芹菜,所以欢迎任何帮助。
因此,我尝试在任务中调用模型实例,但尚未创建该实例。我的解决方案是创建一个函数来使用transiction.on_commit调用我的任务,代码如下:
def post(self, request, *args, **kwargs):
form = self.get_form()
if form.is_valid():
instance = form.save()
kwargs = {
'file_id': instance.id,
'file_type': instance.type,
}
task = import_file_task
result = task.freeze()
def run_task():
task.signature(kwargs=kwargs).apply_async()
transaction.on_commit(run_task)
return JsonResponse(
{
'form_status': 'Success',
'task_id': result.task_id,
'file_detail_url': self._get_detail_url(instance.id)
}
)
else:
return JsonResponse(
{
'form_status': 'Error',
'error': json.loads(form.errors.as_json())
}
)