在使用asyncio的恰好100个请求之后,并行请求无限阻塞



我尝试过同时使用httpx和aiohttp,它们都有这个硬编码的限制。

import asyncio
import aiohttp
import httpx

async def main():
client = aiohttp.ClientSession() 
# client = httpx.AsyncClient(timeout=None)
coros = [
client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
for _ in range(500)
]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ")

asyncio.run(main())

输出-

0、1、2、3、4、5、6、7、8、9、10、11、12、13、14、15、16、17、18、19、20、21、22、23、24、25、26、27、28、29、30、31、32、33、34、35、36、37、38、39、40、41、42、43、44、45、46、47、48、49、50、51、52、53、54、55、56、57、58、59、60、61、62、64、65、66、67、68、69、70、72、73、74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99

它刚刚停留在99与两个图书馆

但是,如果每个请求都使用一个新会话,则不会发生这种情况。

我做错了什么?异步的全部意义不就是让这样的事情变得容易吗?


我试着用线程、zmq和请求重写它,它运行得很好-

import zmq
N_WORKERS = 100
N_ITERS = 500
ctx = zmq.Context.instance()

def worker():
client = requests.Session()
pull = ctx.socket(zmq.PULL)
pull.connect("inproc://#1")
push = ctx.socket(zmq.PUSH)
push.connect("inproc://#2")
while True:
if not pull.recv_pyobj():
return
r = client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",},
)
push.send_pyobj(r.content)

def ventilator():
push = ctx.socket(zmq.PUSH)
push.bind("inproc://#1")
# distribute tasks to all workers
for _ in range(N_ITERS):
push.send_pyobj(True)
# close down workers
for _ in range(N_WORKERS):
push.send_pyobj(False)

# start workers & ventilator
threads = [Thread(target=worker) for _ in range(N_WORKERS)]
threads.append(Thread(target=ventilator))
for t in threads:
t.start()
# pull results from workers
pull = ctx.socket(zmq.PULL)
pull.bind("inproc://#2")
for i in range(N_ITERS):
pull.recv_pyobj()
print(i, end=", ")
# wait for workers to exit
for t in threads:
t.join()

问题是client.get(...)向操作系统级套接字返回一个带有活动句柄的请求对象。未能关闭该对象会导致aiohttp耗尽套接字,即达到连接器限制,默认情况下为100。

要解决此问题,您需要关闭client.get()返回的对象,或者使用async with,这将确保在with块完成后立即关闭对象。例如:

async def get(client):
async with client.get(
"https://query1.finance.yahoo.com/v8/finance/chart/",
params={"symbol": "ADANIENT.NS", "interval": "2m", "range": "60d",}) as resp:
pass
async def main():
async with aiohttp.ClientSession() as client:
coros = [get(client) for _ in range(500)]
for i, coro in enumerate(asyncio.as_completed(coros)):
await coro
print(i, end=", ", flush=True)
asyncio.run(main())

此外,还应该关闭aiohttp.ClientSession对象,这也可以使用async with来实现,如上所示。

最新更新