我有一个回溯测试函数,需要使用数百种可能的参数组合来运行。在我的笔记本电脑上按顺序运行它会非常耗时,所以我将该功能部署到谷歌云,希望我可以利用跨多个实例的扩展来并行处理多个请求。
我写了一个提交异步请求的简短脚本,类似于这样:
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
for (param1, param2, param3) in pd.MultiIndex.from_product([[1, 2, 3], [4, 6, 8], [8, 10, 12]]):
payload = {
'param1': param1,
'param2': param2,
'param3': param3,
}
await session.post(url, json=payload)
asyncio.run(main())
当然,我有更多的参数,但循环结构是一样的。我启动了脚本,根据存储桶中出现的工件判断,请求是按顺序处理的。我检查了函数度量,自提交请求以来,实例数量一直在1到2之间波动。当请求仍处于挂起状态时,我打开了功能详细信息,并将实例的最大数量设置为3000,这触发了重新部署,但最终什么都没有改变,使用新设置,实例数量仍然是1或2,挂起的请求继续按顺序处理。
我的问题是:
- 如果实例设置为3000
- 在什么情况下更多的情况会为该功能提供吗
- 该功能是使用部署的HTTP触发器-如果是Pub/Sub,会有什么不同吗触发器
- 我想实现并行执行(不一定一次3000个实例,但肯定不止2(-有可能吗使用谷歌云功能,或者我应该看看另一个他们的云平台上的产品
我对GCP上的无服务器系统还很陌生,因此非常感谢您的任何见解或建议。
根据@dim的评论,我所要做的就是使用asyncio.gather
,所以我将提交请求的代码更改为这样的代码:
import aiohttp
import asyncio
async def submit_func(session, url, payload):
async with session.post(url, json=payload) as resp:
result = await resp.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = []
for (param1, param2, param3) in pd.MultiIndex.from_product([[1, 2, 3], [4, 6, 8], [8, 10, 12]]):
payload = {
'param1': param1,
'param2': param2,
'param3': param3,
}
tasks.append(asyncio.ensure_future(submit_func(session, url, payload)))
resluts = await asyncio.gather(*tasks)
asyncio.run(main())
代码的灵感来自于这个博客中的例子。
我可以确认,使用asyncio.gather
向Google Cloud Function提交请求会导致并行处理,并通过在平台侧使用额外的活动实例(如果需要(进行扩展。