我已经构建了一个基于aiohttp的小蜘蛛。这里有一些缩写代码:
import asyncio
from aiohttp import ClientSession
from threading import Thread
class Spider:
def __init__(self, urls):
self.urls = urls
self.start()
async def fetch(self, session, url):
async with session.get(url) as response:
await self.handle_status(response) # undefined here for brevity
return await self.render_body(response) # undefined here for brevity
async def process_urls(self):
async with ClientSession() as session:
tasks = {self.fetch(session, url) for url in self.urls}
for task in asyncio.as_completed(tasks):
raw_data = await task
data = self.extract_data(*raw_data) # sync method undefined here for brevity
await self.store_data(data) # undefined here for brevity
def start(self) -> None:
try:
asyncio.run(self.process_urls())
except RuntimeError: # loop already running
x = Thread(target=asyncio.run, args=(self.process_urls(),))
x.start()
x.join()
start方法旨在启动一个异步循环,但如果已经在运行,那么它将在一个新线程中启动一个新循环。
无论是否从现有循环运行,代码都能工作。但是,如果从现有循环运行(例如使用pytest.mark.asyncio(((,我会收到以下警告:
RuntimeWarning:从未等待协程'Spider.process_urls'x.join((
这个警告是我应该关心的吗?
有没有更好的方法来处理这个问题,而不会引起这个警告?
我尝试过使用loop.create_task(self.process_urls(((的loop=asyncio.get_running_roop((,而不是创建一个新线程,但失败的原因是:
运行时错误:协同程序被忽略生成器退出
我尝试过嵌套异步,但我的主要用例涉及另一个使用uvloop的库,因此嵌套异步不兼容。
我承认,我无法向您完整解释出现警告的原因,但我可以向您提供无警告的代码:
def start(self) -> None:
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.run(self.process_urls())
else:
# Since the loop is already instantiated we can use it with our own
# tasks without creating another thread.
# asyncio.create_task(self.process_urls())
x = Thread(target=asyncio.run, args=(self.process_urls(),))
x.start()
x.join()
可能,您得到RuntimeWarning
是因为您的代码在处理异常期间创建了新的线程(和BTW,即使您切换到asyncio.create_task()
,也会产生该警告(,其中";正常物理定律;不适用。但再说一遍,这只是我的猜测,所以请谨慎对待。