我正在使用Celery编写数据处理管道,因为这大大加快了速度。
请考虑以下伪代码:
从芹菜.结果导入结果集 从some_celery_app processing_task导入类型为 @app.task 的 # def crunch_data((: 结果 = 结果集([]( 对于 mongo.find(( 中的文档: #Around 100K - 1M 文档 作业 = processing_task.延迟(文档( 结果.添加(作业( 返回结果.get(( collected_data = crunch_data(( #Do 收集到的数据的一些东西,
我成功地生成了四个启用了并发的工作线程,当我运行此脚本时,数据会得到相应的处理,我可以做任何我想做的事情。
我使用 RabbitMQ 作为消息代理,rpc
作为后端。
当我打开 RabbitMQ 管理 UI 时,我看到了什么:
- 首先,处理所有文件
- 然后,也只有这样,集体
results.get()
调用才能检索到文件。
我的问题:有没有办法同时进行处理和后续检索?就我而言,由于所有文档都是相互不依赖的原子实体,因此似乎无需等待作业完全处理。
您可以在ResultSet.get(callback=cbResult)
中尝试回调参数,然后您可以在回调中处理结果。
def cbResult(task_id, value):
print(value)
results.get(callback=cbResult)