使用Celey+RabbitMQ+RRedis实现订单管理服务



我已经实现了使用zipstream将一些文件动态打包到.zip文件中的功能,因此使用Flaskmimetype='application/zip'的格式进行响应。现在我想为当前用户实现提交订单(即生成.zip文件的任务(、跟踪订单状态(如STARTEDSUCCESSFAILURE(、撤销订单(即取消生成.zip文件的任务或删除生成的.zip文件(、下载订单(即.zip文件(的服务。我计划使用Celery,其中RabbitMQ是消息代理,Redis作为结果后端。

问题来了。仅仅Redis作为结果后端就足够了吗?因为跟踪用户订单的状态似乎涉及像user = ...这样的查询,而不是任务idAsyncResult查询的支持

更新:感谢@TomášLinhart。我遵循了在MongoDB中使用芹菜的signal handler mechanism存储结果的扩展方法。以下是与我创建的任务相关的代码片段。我创建task_trackercelery_cli实例的方式与@TomášLinhart的答案中的trackerapp相同。

# Import task_tracker and celery_cli here
...
@task_tracker.track
@celery_cli.task(name='pack-up-tif')
def async_pack_up_tif(**kwargs):
# Some processing here
...
def pack_up_tif(msg):
result = async_pack_up_tif.delay(**msg)
return result

但我还有一个问题。调用delay方法时如何拦截任务id?因为当触发task_success信号时,我需要通过find_one_and_update将该信息存储在MongoDB的集合中。

详细介绍我发布的评论并回答您更新的问题。确定从哪里收集你想在每个信号中跟踪的各种数据有点麻烦。有些信号为您提供任务本身,有些则提供任务请求。

我以这个task_success信号的处理程序结束:

def _on_task_success(self, sender, result, **other_kwargs):
if sender.name not in self.tasks:
return
collection = self.mongo 
.get_database(self.config['mongodb']['database']) 
.get_collection(self.config['mongodb']['collection'])
collection.find_one_and_update(
{'_id': sender.request.id},
{
'$setOnInsert': {
'name': sender.name,
'args': sender.request.args,
'kwargs': sender.request.kwargs
},
'$set': {
'status': states.SUCCESS,
'date_done': datetime.datetime.utcnow(),
'retries': sender.request.retries,
'group_id': sender.request.group,
'chord_id': sender.request.chord,
'root_id': sender.request.root_id,
'parent_id': sender.request.parent_id,
'result': result
},
'$push': {
'status_history': {
'date': datetime.datetime.utcnow(),
'status': states.SUCCESS
}
}
},
upsert=True,
return_document=ReturnDocument.AFTER)

最新更新