运行时从__init__运行协程时出错



这是一个示例代码。

class Foo:
def __init__(self):
self._run_coro()
def _run_coro(self):
async def init():
bar = #some I/O op
self.bar = bar
loop = asyncio.get_event_loop()
loop.run_until_complete(init())
async def spam(self):
return await #I/O op
async def main():
foo = Foo()
await foo.spam()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

当我运行此代码时,出现以下异常:RuntimeError: This event loop is already running

如果我在main外部初始化Foo,代码运行没有任何异常。我想初始化Foo以便在初始化期间运行一个创建类属性bar的协程。

我无法弄清楚如何正确执行此操作。如何从__init__运行协程。

任何帮助将不胜感激。

class Foo:
def __init__(self):
self.session = requests.Session()
self.async_session = None
#I guess this can be done to initialize it. 
s = self.init_async_session()
try:
s.send(None)
except StopIteration:
pass
finally:
s.close()
async def init_async_session(self):
#ClientSession should be created inside a coroutine. 
self.async_session = aiohttp.ClientSession()

初始化self.async_session的正确方法是什么

如果某个方法使用异步的东西,它应该被显式定义为异步。这是asyncio背后的核心思想:让你以一种你总是知道某个任意方法是否可以异步做某事的方式编写代码。

在您的代码段中,您想在同步方法__init__中执行异步操作(barI/O),并且asyncio禁止它。您应该使_run_coro异步并异步初始化Foo,例如,使用__await__方法:

import asyncio

class Foo:
def __await__(self):
return self._run_coro().__await__()
async def _run_coro(self):  # real async initializer
async def init():
await asyncio.sleep(1)  # bar I/O
self.bar = 123
await init()
return self
async def spam(self):
return await asyncio.sleep(1)  # I/O op

async def main():
foo = await Foo()
await foo.spam()

asyncio.run(main())  # instead of two lines in Python 3.7+

您可能有兴趣阅读此答案以更好地了解asyncio的工作原理以及如何处理它。

更新:

s = self.init_async_session()
try:
s.send(None)

不要做这样的事情:生成器的方法只是协程的实现细节。您可以预测协程在调用方法时将如何反应.send()并且可以依赖此行为。

如果要执行协程使用await,如果要"在后台"启动它,请使用文档中asyncio的任务或其他功能。

初始化self.async_session的正确方法是什么

当涉及到aiohttp.ClientSession它不仅应该被创建,而且应该适当地关闭。最好的方法是使用异步上下文管理器,如aiohttp文档所示。

如果要在Foo中隐藏此操作,也可以将其设置为异步管理器。完整示例:

import aiohttp

class Foo:
async def __aenter__(self):
self._session = aiohttp.ClientSession()
await self._session.__aenter__()
return self
async def __aexit__(self, *args):
await self._session.__aexit__(*args)
async def spam(self):
url = 'http://httpbin.org/delay/1'
resp = await self._session.get(url)
text = await resp.text()
print(text)

async def main():
async with Foo() as foo:
await foo.spam()

asyncio.run(main())

更新2:

您可以组合从上方初始化/关闭对象的方法,以获得您喜欢的结果。只要您记住这两个操作都是异步的,因此应该等待,一切都应该没问题。

另一种可能的方法:

import asyncio
import aiohttp

class Foo:
def __await__(self):
return self._init().__await__()
async def _init(self):
self._session = aiohttp.ClientSession()
await self._session.__aenter__()
return self
async def close(self):
await self._session.__aexit__(None, None, None)
async def spam(self):
url = 'http://httpbin.org/delay/1'
resp = await self._session.get(url)
text = await resp.text()
print(text)

async def main():
foo = await Foo()
try:
await foo.spam()
finally:
await foo.close()

asyncio.run(main())

这是我的解决方案。

class Session:
def __init__(self, headers):
self._headers = headers
self._session = requests.Session()
self._async_session = None
async def _init(self):
self._session = aiohttp.ClientSession(headers=headers)
async def async_request(self, url):
while True:
try:
async with self._async_session.get(url) as resp:
resp.raise_for_status()
return await resp.text()
except aiohttp.client_exceptions.ClientError:
#retry or raise
except AttributeError:
if isinstance(self._async_session, aiohttp.ClientSession):
raise
await self._init()
def request(self, url):
return self._session.get(url).text
async def close(self):
if isinstance(self._async_session, aiohttp.ClientSession):
await self._session.close()
async def main():
session = Session({})
print(await session.async_request('https://httpstat.us/200')
await session.close()
asyncio.run(main())

我可以初始化Session类并发出同步和异步请求。我不必显式调用await session._init()来初始化self._async_session,因为当调用session._async_request并且self._async_session为 None 时,将调用await session._init()并重试请求。

最新更新