使用pythonAiohttp限制每分钟的并发和控制请求



有一款名为《激战2》的游戏,它为我们提供了查询游戏数据库中几乎所有内容的API。我的目标是使用python asyncio和aiohttp编写一个简单的爬网程序,并从guild wars 2游戏数据库中获取所有项目的信息。

我写了一个简短的程序,这是一项工作,但它的行为有点奇怪,我想我对编写协同程序有些不理解。

首先,我用Postman应用程序提出了一个请求。在响应标头中,有X-Rate-Limit-Limit,600。所以我想请求限制在每分钟600次?

这是我的问题。

1、 程序结束后。我检查了一些JSON文件,它们有相同的内容

[{"name": "Endless Fractal Challenge Mote Tonic", "description": "Transform into a Challenge Mote for 15 minutes or until hit. You cannot move while transformed."......

这意味着请求得到了糟糕的回应,但我不知道为什么。

2、 我尝试了异步。信号量,但即使我将并发限制在5,请求也很快超过600。所以我试图通过在request_item函数的末尾添加time.sleep(0.2(来控制时间。我猜time.sleep(0.2(会将整个python进程挂起0.2秒,实际上它是有效的,但在执行了一段时间后,程序挂起了很长一段时间,然后发出了很多失败的尝试。每次自动重试仍然失败。我对这种行为感到困惑。

async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
# retry for 3 times when exception occurs.
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
time.sleep(0.2)

当我将time.sleep(0.2(移动到request_item函数内的for循环中时,整个程序挂起。我不知道发生了什么。

async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
time.sleep(0.2)
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue

有人能解释一下吗?还有更好的解决方案吗?我想有一些解决方案,但我无法测试。比如,获取loop.time((,并为每600个请求挂起整个事件循环。或者,将600个请求添加到task_list中,并将它们收集为一个组,完成后,再次使用另外600个请求执行asyncio.run(get_item(req_ids((。

这是我所有的代码。

import aiohttp
import asyncio
import httpx
import json
import math
import os
import time
tk = 'xxxxxxxx'
url_template = 'https://api.guildwars2.com/v2/items'
# get items list
req_param = {'access_token': tk}
item_list_resp = httpx.get(url_template, params=req_param)
items = item_list_resp.json()
async def request_item(session, item_id):
req_param_item = req_param
req_param_item['ids'] = item_id
for i in range(3):
try:
async with session.get(url_template, params=req_param_item) as response:
result = await response.json()
with open(f'item_info/{item_id}.json', 'w') as f:
json.dump(result, f)
print(item_id, 'done')
break
except Exception as e:
print(item_id, i, 'failed')
continue
# since the game API limit requests, I think it's ok to suspend program for a while
time.sleep(0.2)
async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task) 
await asyncio.gather(*task_list)
asyncio.run(get_item(req_ids))

您使用的是time.sleep()而不是await asyncio.sleep()。这是N秒的阻塞孔执行,并且在错误的地方执行。

事情是这样的。当你运行

for item_id in item_ids:
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)

你只是安排你的请求,但不运行它。(例如,你有1000个item_ids(所以你安排了1000个任务,当你运行await asyncio.gather(*task_list)时,你实际上等待的1000个任务都会被执行。他们会立刻开火。

但在每个任务中,您都要运行time.sleep(0.2),并且必须等待1000*0.2秒。记住,所有任务都是一次性运行的,而且通常是按随机顺序运行的。所以你运行任务1并等待0.2秒,然后启动任务2并等待0.2秒钟,然后任务999启动并等待0.2秒内,依此类推

最简单的解决方案是在触发600个请求后等待一分钟。你需要在get_item内减速。示例代码(我不测试它(:

async def get_item(item_ids: list):
task_list = []
async with aiohttp.ClientSession() as session:
for n, item_id in enumerate(item_ids):
req = request_item(session, item_id)
task = asyncio.create_task(req)
task_list.append(task)
if n % 600 == 0:
await asyncio.gather(*task_list)
await asyncio.sleep(60)
task_list = []

我建议您使用库异步油门。

PS。由于速率限制为每分钟600,我认为您不需要asyncio,因为我非常确定在5-10秒内会执行600个并发请求。两次检查您的600个请求是否使用带线程的经典requests需要1分钟以上的时间。

最新更新