我正在尝试发送~400个HTTP GET请求并收集结果。我正在从姜戈跑。我的解决方案是将芹菜与gevent一起使用。
要开始我称之为get_reports的芹菜任务:
def get_reports(self, clients, *args, **kw):
sub_tasks = []
for client in clients:
s = self.get_report_task.s(self, client, *args, **kw).set(queue='io_bound')
sub_tasks.append(s)
res = celery.group(*sub_tasks)()
reports = res.get(timeout=30, interval=0.001)
return reports
@celery.task
def get_report_task(self, client, *args, **kw):
report = send_http_request(...)
return report
我使用 4 个工人:
manage celery worker -P gevent --concurrency=100 -n a0 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a1 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a2 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a3 -Q io_bound
我使用RabbitMq作为经纪人。
尽管它的工作速度比按顺序运行请求要快得多(400 个请求需要 ~23 秒),但我注意到大部分时间都是芹菜本身的开销,即如果我像这样更改get_report_task:
@celery.task
def get_report_task(self, client, *args, **kw):
return []
整个操作耗时 ~19 秒。这意味着我只花 19 秒将所有任务发送到芹菜并返回结果
兔子mq的消息排队率似乎绑定到28条消息/秒,我认为这是我的瓶颈。
如果这很重要,我正在 win 8 机器上运行。
我尝试过的一些事情:
- 使用 Redis 作为代理
- 使用 Redis 作为结果后端
使用这些设置进行调整
BROKER_POOL_LIMIT = 500
CELERYD_PREFETCH_MULTIPLIER = 0
CELERYD_MAX_TASKS_PER_CHILD = 100
CELERY_ACKS_LATE = 假
CELERY_DISABLE_RATE_LIMITS = 真
我正在寻找任何有助于加快速度的建议。
你真的在没有虚拟机的情况下在Windows 8上运行吗?我在运行OS X 10.7的2核Macbook 8GB RAM上进行了以下简单测试:
import celery
from time import time
@celery.task
def test_task(i):
return i
grp = celery.group(test_task.s(i) for i in range(400))
tic1 = time(); res = grp(); tac1 = time()
print 'queued in', tac1 - tic1
tic2 = time(); vals = res.get(); tac2 = time()
print 'executed in', tac2 - tic2
我正在使用 Redis 作为代理,Postgres 作为结果后端和具有 --concurrency=4
的默认工作线程。猜猜输出是什么?在这里:
在 3.5009469986 中排队
执行于 2.99818301201
事实证明,我有两个单独的问题。
首先,任务是一个成员方法。从课堂上提取出来后,时间下降到大约 12 秒。我只能假设这与自我的腌制有关。
第二件事是它在窗口上运行的事实。在我的 linux 机器上运行它后,运行时间不到 2 秒。猜测窗户只是不是为了高性能而切割的。
改用扭曲怎么样?您可以获得更简单的应用程序结构。你可以一次从 django 进程发送所有 400 个请求,并等待所有请求完成。这同时工作,因为扭曲将套接字设置为非阻塞模式,并且仅在数据可用时读取数据。
不久前我遇到了类似的问题,我在twisted和django之间建立了一个很好的桥梁。我已经在生产环境中运行它将近一年了。你可以在这里找到它:https://github.com/kowalski/featdjango/。简而言之,它让主应用程序线程运行主扭曲反应器循环,并且 django 视图结果委托给线程。它使用一个特殊的线程池,该线程池公开了与 reactor 交互并使用其异步功能的方法。
如果你使用它,你的代码将如下所示:
from twisted.internet import defer
from twisted.web.client import getPage
import threading
def get_reports(self, urls, *args, **kw):
ct = threading.current_thread()
defers = list()
for url in urls:
# here the Deferred is created which will fire when
# the call is complete
d = ct.call_async(getPage, args=[url] + args, kwargs=kw)
# here we keep it for reference
defers.append(d)
# here we create a Deferred which will fire when all the
# consiting Deferreds are completed
deferred_list = defer.DeferredList(defers, consumeErrors=True)
# here we tell the current thread to wait until we are done
results = ct.wait_for_defer(deferred_list)
# the results is a list of the form (C{bool} success flag, result)
# below unpack it
reports = list()
for success, result in results:
if success:
reports.append(result)
else:
# here handle the failure, or just ignore
pass
return reports
这仍然是您可以进行大量优化的东西。在这里,每次调用getPage()都会创建一个单独的TCP连接,并在完成后关闭它。这是尽可能最佳的,前提是您的 400 个请求中的每一个都发送到不同的主机。如果不是这种情况,则可以使用 http 连接池,该池使用持久连接和 http 管道。你像这样实例化它:
from feat.web import httpclient
pool = httpclient.ConnectionPool(host, port, maximum_connections=3)
而不是像这样执行单个请求(这改为getPage()调用):
d = ct.call_async(pool.request, args=(method, path, headers, body))