我正在一个FastAPI端点工作,它使I/O绑定操作,这是异步的效率。但是,这需要时间,所以我想缓存结果以便在一段时间内重用它。
我现在有这个:
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")
我正在尝试使用cachetools
包来缓存结果,我尝试了以下内容:
import asyncio
from cachetools import TTLCache
from fastapi import FastAPI
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")
然而,这失败了,因为,据我所知,__missing__
方法是同步的,你不能从同步从异步调用异步。错误是:
RuntimeError: this event loop is already running.
如果我使用普通asyncio而不是uvloop,也会出现类似的错误。
对于asyncio事件循环,我尝试过使用nest_asyncio
包,但它不修补uvloop
,而且,即使在使用asyncio时,似乎服务在第一次使用后冻结。
你知道我怎样才能做到这一点吗?
其他遇到这种情况的人(包括我自己在15天内)的自动回答:
TTLCache
像普通的python字典一样工作,访问丢失的键将调用__missing__
方法。因此,如果存在,我们希望使用字典中的值,如果不存在,我们可以在此方法中收集资源。这个方法还应该在缓存中设置键(这样下次它就会出现),并返回这次使用的值。
class ResourceCache(TTLCache):
def __missing__(self, key) -> asyncio.Task:
# Create a task
resource_future = asyncio.create_task(_get_expensive_resource(key))
self[key] = resource_future
return resource_future
因此,我们有一个缓存(本质上是一个字典),它将键映射到asyncio.Tasks。任务将在事件循环中异步执行(已经由FastAPI启动)。当我们需要结果时,我们可以在端点代码或实际上任何地方为它们await
,只要它和async函数!@app.get("/")
async def get(key:str) -> bool:
return await resource_cache[key]
第二次调用该端点(在缓存超时时间内)将使用缓存的资源(在我们的示例中使用'true'模拟)。
下面是如何使用cachetools
库缓存FastAPI调用的示例,该库具有上述相同的异步函数,而不需要任何自定义类:
from fastapi import FastAPI
from cachetools import TTLCache
import asyncio
app = FastAPI()
# Create a cache with a maximum size of 100 entries and a TTL of 60 seconds
cache = TTLCache(maxsize=100, ttl=60)
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(5)
return True
@app.get("/{key}")
async def get(key):
# Check if the result is already in the cache
result = cache.get(key)
if result is not None:
print(f"Found it in cache for key {key}")
return result
result = await _get_expensive_resource(key)
# Store the result in the cache
cache[key] = result
return result
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
第一次调用路由时,计算结果并存储在缓存中。在接下来的60秒内对路由的后续调用将返回缓存的结果,而无需重新计算。
然后我从我的终端本地调用它
curl http://localhost:8000/mykey
第一个电话花了5秒钟,在第一分钟内,我执行的所有电话都得到了立即响应。