如何在 Celery 中异步接收已处理的消息?



我正在使用Celery编写数据处理管道,因为这大大加快了速度。

请考虑以下伪代码:

从芹菜.结果导入结果集 从some_celery_app processing_task导入类型为 @app.task 的 # def crunch_data((: 结果 = 结果集([]( 对于 mongo.find(( 中的文档: #Around 100K - 1M 文档 作业 = processing_task.延迟(文档( 结果.添加(作业( 返回结果.get(( collected_data = crunch_data(( #Do 收集到的数据的一些东西,

我成功地生成了四个启用了并发的工作线程,当我运行此脚本时,数据会得到相应的处理,我可以做任何我想做的事情。

我使用 RabbitMQ 作为消息代理,rpc作为后端。

当我打开 RabbitMQ 管理 UI 时,我看到了什么:

  1. 首先,处理所有文件
  2. 然后,也只有这样,集体results.get()调用才能检索到文件。

我的问题:有没有办法同时进行处理和后续检索?就我而言,由于所有文档都是相互不依赖的原子实体,因此似乎无需等待作业完全处理。

您可以在ResultSet.get(callback=cbResult)中尝试回调参数,然后您可以在回调中处理结果。

def cbResult(task_id, value):
print(value)
results.get(callback=cbResult)

相关内容

  • 没有找到相关文章

最新更新