如何使用asyncio, aiohttp web scraper与fastapi?



我正在学习使用asyncio和aiohttp与beautifulsoup抓取网页。我想创建一个RESTful API来获取用户输入,抓取数据,然后以json格式显示响应。这是我的scraper代码的样子;

import asyncio
import aiohttp
from bs4 import BeautifulSoup, SoupStrainer
class TestScraper:
def __init__(self, query):
self.query = query
async def main(self):
urls = [
f"https://books.toscrape.com/catalogue/page-{self.query}.html",
f"https://quotes.toscrape.com/page/{self.query}/",
]
def get_urls(session):
tasks = []
for url in urls:
tasks.append(session.get(url))
return tasks
async with aiohttp.ClientSession() as session:
tasks = get_urls(session)
responses = await asyncio.gather(*tasks)
for r in responses:
if (str(r.url).split(".")[0][8:]) == "books":
soup = BeautifulSoup(
await r.read(), "lxml", parse_only=SoupStrainer("article")
)
books_list = []
for books in soup.find_all("article"):
book_name = books.find("h3").find("a").get("title")
book_price = books.find("p", class_="price_color").text
books_item = {
"book_name": book_name,
"book_price": book_price,
}
books_list.append(books_item)
yield books_list
elif (str(r.url).split(".")[0][8:]) == "quotes":
soup = BeautifulSoup(
await r.read(),
"lxml",
parse_only=SoupStrainer("div", {"class": "quote"}),
)
quotes_list = []
for quotes in soup.find_all("div", class_="quote"):
quote_text = quotes.find("span", class_="text").get_text()
quote_author = quotes.find("small", class_="author").get_text()
quotes_item = {
"quote_text": quote_text,
"quote_author": quote_author,
}
quotes_list.append(quotes_item)
yield quotes_list
else:
yield "No results found"
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(TestScraper(6).main())
# asyncio.run(TestScraper({query}).main())

它的工作很好,但当我尝试使用它与FastAPI它返回错误。即使在做了一些改变之后,我从网上发现错误仍然出现。这是我的FastAPI代码;

import asyncio
from fastapi import FastAPI
from scrapers.books_quotes import TestScraper
app = FastAPI()
@app.get("/")
def root():
return {"message": "Hello World"}
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
return asyncio.run(TestScraper(test_query).main())

我得到的错误是;

asyncio.run()不能从运行的事件循环中调用

如何解决?

asyncio.run是异步代码的顶级入口点,FastAPI应用程序(或其他一些你用来运行它的框架)应该已经为你调用了。

通常,要在异步代码中运行async def函数(=协程),只需await它。

@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
return await TestScraper(test_query).main()

在您的示例中,TestScraper.main不是一个正常的协程,而是一个异步生成器(因为它使用yield语句)。您可以在async for循环中使用它来运行它。

@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
async for result in TestScraper(test_query).main():
# do something with result

不是在TestScraper代码中为每个URL创建一个列表,而是为所有URL创建一个单独的列表。

#same code as before
async with aiohttp.ClientSession() as session:
tasks = get_urls(session)
responses = await asyncio.gather(*tasks)
results = []
for r in responses:
if (str(r.url).split(".")[0][8:]) == "books":
soup = BeautifulSoup(
await r.read(), "lxml", parse_only=SoupStrainer("article")
)
for books in soup.find_all("article"):
book_name = books.find("h3").find("a").get("title")
book_price = books.find("p", class_="price_color").text
books_item = {
"book_name": book_name,
"book_price": book_price,
}
results.append(books_item)
elif (str(r.url).split(".")[0][8:]) == "quotes":
soup = BeautifulSoup(
await r.read(),
"lxml",
parse_only=SoupStrainer("div", {"class": "quote"}),
)
for quotes in soup.find_all("div", class_="quote"):
quote_text = quotes.find("span", class_="text").get_text()
quote_author = quotes.find("small", class_="author").get_text()
quotes_item = {
"quote_text": quote_text,
"quote_author": quote_author,
}
results.append(quotes_item)
else:
results.append({"error": f"No results found for {r.url}"})
yield results
#print(results)
#same code as before

并且由于@mkrieger1更改了FastAPI文件,即main.py代码,如下所示;

#same code as before
@app.get("/test/{test_query}")
async def read_test_items(test_query: str):
async for results in TestScraper(test_query).main():
return results

现在一切都很好。谢谢你的阅读,祝你有愉快的一天。

最新更新