芹菜辅助角色中的多线程



我正在使用 Celery 和 RabbitMQ 来处理来自 API 请求的数据。该过程如下:

请求> API> RabbitMQ> 芹菜工人>

返回

理想情况下,我会生成更多的芹菜工人,但我受到内存限制的限制。

目前,我的流程中的瓶颈是从传递给工作线程的 URL 获取和下载数据。粗暴,过程如下所示:

def celery_gets_job(url):
    data = fetches_url(url)       # takes 0.1s to 1.0s (bottleneck)
    result = processes_data(data) # takes 0.1s
    return result

这是不可接受的,因为工作人员在获取 URL 时被锁定了一段时间。我正在考虑通过线程来改进这一点,但我不确定最佳实践是什么。

  • 有没有办法让芹菜工人异步下载传入的数据,同时在不同的线程中同时处理数据?

  • 我是否应该让单独的工作线程来获取和处理,通过某种形式的消息传递,可能通过 RabbitMQ?

使用 eventlet 库,您可以修补标准库以使其异步。

首先导入异步 urllib2:

from eventlet.green import urllib2

因此,您将获得包含以下内容的网址正文:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body

在此处查看更多eventlet示例。

我将创建两个任务,一个用于下载数据,另一个用于在下载后处理数据。这样,您可以独立缩放这两个任务。请参阅:路由、链条。

相关内容

  • 没有找到相关文章

最新更新