concurrent.futures关闭后等待时间过长



我有一个URL列表,我正试图从中获取JSON数据。我正在浏览10000个项目的列表,所以速度对我来说是关键。这在串行处理中花费了很长时间,所以我选择使用会话。我以前没有用过它,但我已经能够生产出足够快的东西。

ar_list=['https//:www.foo.com',...,'https//:www.foo2.com']
adapter = HTTPAdapter(pool_connections=workers_num, pool_maxsize=workers_num)
data_list = []
with sessions.FuturesSession(max_workers=workers_num) as session:
session.mount('http://', adapter)
session.mount('https://', adapter)  
futures = []
for idx, ar_url in enumerate(ar_list):
resp = session.get(ar_url,headers=headers)
futures.append(resp)
end_time = datetime.datetime.now()
time_delta=end_time-start_time

这运行得很好,并且在大约70us内运行一个查询(有10000个查询(。不幸的是,我在获取数据时遇到了问题。futures.append(resp)给了我一个Future对象,我需要在此对象上运行.result().json()才能真正获得可用的信息。然而,当我添加这些时,我的操作速度非常慢,每次查询大约0.4秒(比70us慢得多!(

我尝试过快速运行FutureSession(如上面的代码所示(,然后稍后使用对未来列表执行操作

循环的
  1. 列表理解:[item.result().json() for item in as_completed(futures)]
  2. lambda列表操作:list(map(lambda i: func(futures, i), range(0, len(futures))))
  3. 最后尝试另一个会话来执行操作:
data=[]
with concurrent.futures.ThreadPoolExecutor() as e:
fut = [e.submit(func, i) for i in futures]
for idx,r in enumerate(concurrent.futures.as_completed(fut)):
print(str(idx))
data.append(r.result())

现在,这个ThreadPoolExecutor也工作得很好,可以执行大约70us的任务。然而,我在with的开始和结束处进行测量,并注意到这两个with结构之间存在很大的延迟。看起来像这样:


with sessions.FuturesSession(max_workers=workers_num) as session:
#some fast operations here
"""
A VERY LARGE DELAY (about 60 seconds)
"""
with concurrent.futures.ThreadPoolExecutor() as e:
#some fast operations here

是否对我在两个with条件之间创建的需要很长时间才能解决的所有会话进行了清理?我可以看到,必须有比两个会话进程更好的方法来实现这一点,只需正确格式化一些json数据,但我似乎找不到更快的方法。

经过大量的研究,我发现我经历的最大等待时间是as_completed调用。正如@Simon所说,漫长的等待时间只是我在等待回复。如果我删除as_completed,它会变得和以前一样快,但当然响应是垃圾或None

我正在考虑避免使用带有回调的as_completed,以避免需要等待as_completed出现在期货上,只需从一个完整的列表中获取它们。希望我能很快带着";围绕"工作";

最新更新