我在Google Cloud中有一个函数,它接受许多参数。我使用aiohttp:生成约2k个具有不同参数值组合的异步请求
# url = 'https://...'
# headers = {'X-Header': 'value'}
timeout = aiohttp.ClientTimeout(total=72000000)
async def submit_bt(session, url, payload):
async with session.post(url, json=payload) as resp:
result = await resp.text()
async def main():
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
tasks = []
gen = payload_generator() # a class that generates dictionaries
for payload in gen.param_grid():
tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))
bt_results = await asyncio.gather(*tasks)
for result in bt_results:
pass
asyncio.run(main())
一个函数运行需要3到6分钟,函数超时设置为9分钟,最大实例数设置为3000,但我从未见过超过150-200个实例被启动,即使提交的请求总数在1.5k到2.5k之间。在某些情况下,所有请求都会在20到30分钟内处理完毕,但有时我会在客户端遇到错误:
ClientOSError: [Errno 104] Connection reset by peer
这与服务器端的任何错误都不对应。我认为我应该能够将其作为aiohttp.client_exceptions.ClientOSError
异常捕获,但我不确定如何在异步设置中处理它,以便重新提交失败的请求,避免提前终止。如有任何提示,不胜感激。
@vaizki在评论中建议的解决方案似乎对我来说效果很好。仔细查看回溯后,发现异常是在submit_bt
协同例程中引发的,所以我添加了try-except子句:
async def submit_bt(session, url, payload):
try:
async with session.post(url, json=payload) as resp:
result = await resp.text()
except aiohttp.client_exceptions.ClientOSError as e:
await asyncio.sleep(3 + random.randint(0, 9))
async with session.post(url, json=payload) as resp:
result = await resp.text()
except Exception as e:
result = str(e)
return result
重复的行看起来不太优雅,但这对我来说仍然是一项正在进行的工作,代码结构在这个阶段还没有正式化。不管怎样,很明显我想实现什么:
- 将有效载荷发布到函数URL
- 捕获异常,但仅在
ClientOSError
的情况下重复post,并且仅重复一次
我不想使用while True
类型的循环,以避免在出现一些严重问题时无限执行。我按原样尝试了几次这个代码,我知道它经过了几次连接重置,直到任务列表结束,因为我得到了该函数生成的所有结果,所以即使是这种形式,它也足以满足我的情况。
您可以检查这个线程,这也是一个类似的问题。基于这个线程的答案:
云函数是无状态的,但可以重用以前调用的全局状态。提示和这些文档对此进行了解释。
使用全局状态和重试应该会给你一个更强大的功能:
您可以导入以下库,并可以在@retry方法中使用云函数。
from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():