在aiohttp.ClientSession
的文档中,我读到,理想情况下,您应该为每个应用程序创建一个ClientSession
。我怀疑这应该读作"每个线程创建一个客户端会话"(我在python 3x中(。
import aiohttp
import asyncio
async def get_it_async(url: str, session):
async with session.get(url) as resp:
print(resp.status)
print(len(await resp.text()))
def run_it(fn, *args, **kwargs):
# New thread - get the loop.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
assert not loop.is_running()
return loop.run_until_complete(loop.create_task(fn(*args, **kwargs)))
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(2)
client = aiohttp.ClientSession()
await asyncio.wrap_future(pool.submit(run_it, get_it_async, 'httyp://nytimes.com', client))
给我:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-1-180a408a9698> in async-def-wrapper()
~appdataLocalProgramsPythonPython37libconcurrentfuturesthread.py in run(self)
55
56 try:
---> 57 result = self.fn(*self.args, **self.kwargs)
58 except BaseException as exc:
59 self.future.set_exception(exc)
<ipython-input-1-180a408a9698> in run_it(fn, *args, **kwargs)
15
16
---> 17 from concurrent.futures import ThreadPoolExecutor
18 pool = ThreadPoolExecutor(2)
19
~appdataLocalProgramsPythonPython37libasynciobase_events.py in run_until_complete(self, future)
585 raise RuntimeError('Event loop stopped before Future completed.')
586
--> 587 return future.result()
588
589 def stop(self):
<ipython-input-1-180a408a9698> in get_it_async(url, session)
6 print(resp.status)
7 print(len(await resp.text()))
----> 8
9 def run_it(fn, *args, **kwargs):
10 # New thread - get the loop.
i:gwattscodecalratio2019calms.venvlibsite-packagesaiohttpclient.py in __aenter__(self)
1010
1011 async def __aenter__(self) -> _RetType:
-> 1012 self._resp = await self._coro
1013 return self._resp
1014
i:gwattscodecalratio2019calms.venvlibsite-packagesaiohttpclient.py in _request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx)
424 timer = tm.timer()
425 try:
--> 426 with timer:
427 while True:
428 url, auth_from_url = strip_auth_from_url(url)
i:gwattscodecalratio2019calms.venvlibsite-packagesaiohttphelpers.py in __enter__(self)
577
578 if task is None:
--> 579 raise RuntimeError('Timeout context manager should be used '
580 'inside a task')
581
RuntimeError: Timeout context manager should be used inside a task
我的代码中是否存在错误,或者我的假设是否正确?
据我所知,ClientSession()
和所有嵌套对象都保存循环对象(例如self._loop
( 时实例化。当您尝试在另一个线程(因此另一个活动事件循环(中使用它时,ClientSession
内部代码找不到自己保存的循环的当前任务并引发此异常。
我认为对于每个新线程(以及每个新循环(,都应该实例化新的ClientSession
。
我怀疑这应该读作"每个线程创建一个客户端会话"(我在python 3x中(。
由于几个原因,这种解释是不正确的。首先,建议的意思与书面完全相同 - 每个应用程序通常使用一个ClientSession
,其中应用程序是执行许多独立(但松散相关(下载的程序。例如,网络爬虫将是一个应用程序,因此将是bittorrent客户端等。多个应用程序可以在单个进程中运行,在这种情况下,创建多个彼此完全独立的会话是有意义的。
第二个问题是 asyncio 是单线程的,所以说">每个线程一个会话"是没有意义的。asyncio 的卖点之一是,您可以实现可扩展的并行性,而不依赖于创建大量操作系统线程。创建线程以运行全新事件只是为了下载某些内容的代码在技术上是正确的,但没有必要,特别是因为您的顶级代码似乎一开始就在 asyncio 中运行!它还可能很复杂且容易出错,因为如果要在下载之间共享数据,则需要处理线程间同步问题。
在 asyncio 中实现并行性的方法是在流程中使用单个事件循环,并在其中执行所有操作。例如,以get_it_async
函数为起点,您可以并行下载三个 URL:
async def main():
async with aiohttp.ClientSession() as session:
await asyncio.gather(
get_it_async('https://nytimes.com', session),
get_it_async('https://yahoo.com', session),
get_it_async('https://cnn.com', session),
)
# or, await main() if you call main from another async function
asyncio.run(main())
像asyncio.wrap_future
这样的函数(更典型的是它的表亲loop.run_in_executor
(应该只在从 asyncio 应用程序中调用 CPU 密集型或遗留阻塞代码时使用。如果您有异步代码并从异步调用它,只需等待它或使用asyncio.gather()
和类似功能使其并行运行。