我正在使用 Celery 和 RabbitMQ 来处理来自 API 请求的数据。该过程如下:
请求> API> RabbitMQ> 芹菜工人>
返回理想情况下,我会生成更多的芹菜工人,但我受到内存限制的限制。
目前,我的流程中的瓶颈是从传递给工作线程的 URL 获取和下载数据。粗暴,过程如下所示:
def celery_gets_job(url):
data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck)
result = processes_data(data) # takes 0.1s
return result
这是不可接受的,因为工作人员在获取 URL 时被锁定了一段时间。我正在考虑通过线程来改进这一点,但我不确定最佳实践是什么。
有没有办法让芹菜工人异步下载传入的数据,同时在不同的线程中同时处理数据?
我是否应该让单独的工作线程来获取和处理,通过某种形式的消息传递,可能通过 RabbitMQ?
使用 eventlet
库,您可以修补标准库以使其异步。
首先导入异步 urllib2:
from eventlet.green import urllib2
因此,您将获得包含以下内容的网址正文:
def fetch(url):
body = urllib2.urlopen(url).read()
return body
在此处查看更多eventlet
示例。
我将创建两个任务,一个用于下载数据,另一个用于在下载后处理数据。这样,您可以独立缩放这两个任务。请参阅:路由、链条。