我已经实现了使用zipstream
将一些文件动态打包到.zip
文件中的功能,因此使用Flask以mimetype='application/zip'
的格式进行响应。现在我想为当前用户实现提交订单(即生成.zip
文件的任务(、跟踪订单状态(如STARTED
、SUCCESS
、FAILURE
(、撤销订单(即取消生成.zip
文件的任务或删除生成的.zip
文件(、下载订单(即.zip
文件(的服务。我计划使用Celery,其中RabbitMQ是消息代理,Redis作为结果后端。
问题来了。仅仅Redis作为结果后端就足够了吗?因为跟踪用户订单的状态似乎涉及像user = ...
这样的查询,而不是任务id对AsyncResult
查询的支持
更新:感谢@TomášLinhart。我遵循了在MongoDB中使用芹菜的signal handler mechanism
存储结果的扩展方法。以下是与我创建的任务相关的代码片段。我创建task_tracker
和celery_cli
实例的方式与@TomášLinhart的答案中的tracker
和app
相同。
# Import task_tracker and celery_cli here
...
@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
# Some processing here
...
def pack_up_tif(msg):
result = async_pack_up_tif.delay(**msg)
return result
但我还有一个问题。调用delay
方法时如何拦截任务id?因为当触发task_success
信号时,我需要通过find_one_and_update
将该信息存储在MongoDB的集合中。
详细介绍我发布的评论并回答您更新的问题。确定从哪里收集你想在每个信号中跟踪的各种数据有点麻烦。有些信号为您提供任务本身,有些则提供任务请求。
我以这个task_success
信号的处理程序结束:
def _on_task_success(self, sender, result, **other_kwargs):
if sender.name not in self.tasks:
return
collection = self.mongo
.get_database(self.config['mongodb']['database'])
.get_collection(self.config['mongodb']['collection'])
collection.find_one_and_update(
{'_id': sender.request.id},
{
'$setOnInsert': {
'name': sender.name,
'args': sender.request.args,
'kwargs': sender.request.kwargs
},
'$set': {
'status': states.SUCCESS,
'date_done': datetime.datetime.utcnow(),
'retries': sender.request.retries,
'group_id': sender.request.group,
'chord_id': sender.request.chord,
'root_id': sender.request.root_id,
'parent_id': sender.request.parent_id,
'result': result
},
'$push': {
'status_history': {
'date': datetime.datetime.utcnow(),
'status': states.SUCCESS
}
}
},
upsert=True,
return_document=ReturnDocument.AFTER)