缓存导致异步环境



我正在一个FastAPI端点工作,它使I/O绑定操作,这是异步的效率。但是,这需要时间,所以我想缓存结果以便在一段时间内重用它。

我现在有这个:

from fastapi import FastAPI
import asyncio
app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
@app.get('/')
async def get(key):
return await _get_expensive_resource(key)
if __name__ == "__main__":
import uvicorn
uvicorn.run("test:app")

我正在尝试使用cachetools包来缓存结果,我尝试了以下内容:

import asyncio
from cachetools import TTLCache
from fastapi import FastAPI

app = FastAPI()
async def _get_expensive_resource(key) -> None:
await asyncio.sleep(2)
return True
class ResourceCache(TTLCache):
def __missing__(self, key):
loop = asyncio.get_event_loop()
resource = loop.run_until_complete(_get_expensive_resource(key))
self[key] = resource
return resource
resource_cache = ResourceCache(124, 300)
@app.get('/')
async def get(key: str):
return resource_cache[key]
if __name__ == "__main__":
import uvicorn
uvicorn.run("test2:app")

然而,这失败了,因为,据我所知,__missing__方法是同步的,你不能从同步从异步调用异步。错误是:

RuntimeError: this event loop is already running.

如果我使用普通asyncio而不是uvloop,也会出现类似的错误。

对于asyncio事件循环,我尝试过使用nest_asyncio包,但它不修补uvloop,而且,即使在使用asyncio时,似乎服务在第一次使用后冻结。

你知道我怎样才能做到这一点吗?

其他遇到这种情况的人(包括我自己在15天内)的自动回答:

TTLCache像普通的python字典一样工作,访问丢失的键将调用__missing__方法。因此,如果存在,我们希望使用字典中的值,如果不存在,我们可以在此方法中收集资源。这个方法还应该在缓存中设置键(这样下次它就会出现),并返回这次使用的值。

class ResourceCache(TTLCache):
def __missing__(self, key) -> asyncio.Task:
# Create a task 
resource_future = asyncio.create_task(_get_expensive_resource(key))
self[key] = resource_future
return resource_future
因此,我们有一个缓存(本质上是一个字典),它将键映射到asyncio.Tasks。任务将在事件循环中异步执行(已经由FastAPI启动)。当我们需要结果时,我们可以在端点代码或实际上任何地方为它们await,只要它和async函数!
@app.get("/")
async def get(key:str) -> bool:
return await resource_cache[key]

第二次调用该端点(在缓存超时时间内)将使用缓存的资源(在我们的示例中使用'true'模拟)。

下面是如何使用cachetools库缓存FastAPI调用的示例,该库具有上述相同的异步函数,而不需要任何自定义类:

from fastapi import FastAPI
from cachetools import TTLCache
import asyncio
app = FastAPI()
# Create a cache with a maximum size of 100 entries and a TTL of 60 seconds
cache = TTLCache(maxsize=100, ttl=60)

async def _get_expensive_resource(key) -> None:
await asyncio.sleep(5)
return True

@app.get("/{key}")
async def get(key):
# Check if the result is already in the cache
result = cache.get(key)
if result is not None:
print(f"Found it in cache for key {key}")
return result
result = await _get_expensive_resource(key)
# Store the result in the cache
cache[key] = result
return result

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

第一次调用路由时,计算结果并存储在缓存中。在接下来的60秒内对路由的后续调用将返回缓存的结果,而无需重新计算。

然后我从我的终端本地调用它

curl http://localhost:8000/mykey

第一个电话花了5秒钟,在第一分钟内,我执行的所有电话都得到了立即响应。

相关内容

  • 没有找到相关文章

最新更新