Ratelimit in Fastapi



如何在Fastapi应用程序中对API端点请求进行评级限制?我需要对每个用户每秒API调用5个请求进行评级限制,超过该限制会阻止特定用户60秒。

在主.py 中

def get_application() -> FastAPI:
application = FastAPI(title=PROJECT_NAME, debug=DEBUG, version=VERSION)
application.add_event_handler(
"startup", create_start_app_handler(application))
application.add_event_handler(
"shutdown", create_stop_app_handler(application))
return application
app = get_application()

在事件中.py

def create_start_app_handler(app: FastAPI) -> Callable:  
async def start_app() -> None:           
redis = await aioredis.create_redis_pool("redis://localhost:8080")
FastAPILimiter.init(redis)
return start_app

端点内

@router.post('/user',
tags=["user"],
name="user:user", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
***code****

从这个文件test.py.运行

import uvicorn
from app.main import app
if __name__ == "__main__":
uvicorn.run("test:app", host="0.0.0.0", port=8000, reload=True)

我按照上面的方式编辑,但出现了以下错误。

File "****ite-packagesstarletterouting.py", line 526, in lifespan
async for item in self.lifespan_context(app):
File "****site-packagesstarletterouting.py", line 467, in default_lifespan
await self.startup()
File "****site-packagesstarletterouting.py", line 502, in startup
await handler()
File "****appcoreservicesevents.py", line 15, in start_app
redis = await aioredis.create_redis_pool("redis://localhost:8080")
File "****site-packagesaiorediscommands__init__.py", line 188, in create_redis_pool
pool = await create_pool(address, db=db,
File "****site-packagesaioredispool.py", line 58, in create_pool
await pool._fill_free(override_min=False)
File "C****site-packagesaioredispool.py", line 383, in _fill_free
conn = await self._create_new_connection(self._address)
File "****site-packagesaioredisconnection.py", line 111, in create_connection
reader, writer = await asyncio.wait_for(open_connection(
File "****asynciotasks.py", line 455, in wait_for
return await fut
File "****site-packagesaioredisstream.py", line 23, in open_connection
transport, _ = await get_event_loop().create_connection(
File "****asynciobase_events.py", line 1033, in create_connection
raise OSError('Multiple exceptions: {}'.format(
OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 8080, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 8080)

最佳选项是使用库,因为FastAPI不提供开箱即用的功能。

slowapi很棒,而且很容易使用。

你可以这样使用ut。

from fastapi import FastAPI
from slowapi.errors import RateLimitExceeded
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/home")
@limiter.limit("5/minute")
async def homepage(request: Request):
return PlainTextResponse("test")
@app.get("/mars")
@limiter.limit("5/minute")
async def homepage(request: Request, response: Response):
return {"key": "value"}

FastAPI本机不支持这一点,但有一些库(如下面的库(是可能的,但通常需要某种数据库支持(redis、memcached等(,尽管slowapi在没有数据库的情况下有内存回退。

  • https://pypi.org/project/fastapi-limiter/
  • https://pypi.org/project/slowapi/

为了使用fastapi-limiter,如他们的文档所示:

注意:您需要一个正在运行的Redis才能正常工作。

import aioredis
import uvicorn
from fastapi import Depends, FastAPI
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
app = FastAPI()

@app.on_event("startup")
async def startup():
redis = await aioredis.create_redis_pool("redis://localhost")
FastAPILimiter.init(redis)

@app.get("/", dependencies=[Depends(RateLimiter(times=2, seconds=5))])
async def index():
return {"msg": "Hello World"}

if __name__ == "__main__":
uvicorn.run("main:app", debug=True, reload=True)

您可以使用https://github.com/abersheeran/asgi-ratelimit

与https://pypi.org/project/fastapi-limiter/和https://pypi.org/project/slowapi/,它可以更好地满足您的需求。

这是一个例子:在超过每秒五次访问限制后,阻止特定用户60秒。

app.add_middleware(
RateLimitMiddleware,
authenticate=AUTH_FUNCTION,
backend=RedisBackend(),
config={
r"^/user": [Rule(second=5, block_time=60)],
},
)

fastapi-limiterslowapi是实现Ratelimit in Fastapi的非常漂亮的包。

但使用walrus也可以做到。但应该启动redis数据库。

  1. 启动redis

  2. python代码:编写python文件:code1228.py

  3. 代码:

from walrus import Database, RateLimitException
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
db = Database()
rate = db.rate_limit('xxx', limit=5, per=60)  # in 60s just can only click 5 times
app = FastAPI()

@app.exception_handler(RateLimitException)
def parse_rate_litmit_exception(request: Request, exc: RateLimitException):
msg = {'success': False, 'msg': f'please have a tea for sleep, your ip is: {request.client.host}.'}
return JSONResponse(status_code=429, content=msg)

@app.get('/')
def index():
return {'success': True}

@app.get('/important_api')
@rate.rate_limited(lambda request: request.client.host)
def query_important_data(request: Request):
data = 'important data'
return {'success': True, 'data': data}

if __name__ == "__main__":
uvicorn.run("code1228:app", debug=True, reload=True)
  1. 运行这个python文件。

  2. 测试链接。http://127.0.0.1:8000/important_api

与其使用外部包来存储只需要几秒钟或几分钟的数据,我更喜欢使用以下内容:

在类RateLimiterrequests_limit中,请求使用的总限制可以在time_window秒内执行。

Remember:我使用客户端IP来跟踪他们的请求行为,并在requests_limit超过time_window秒内设置限制。

Note:这是一个依赖项,所以它可能无法与快速API的中间件一起工作(我还没有测试过(

from fastapi import FastAPI, Request, HTTPException, Depends
import time
# Initialize FastAPI app
app = FastAPI()
# In-memory storage for request counters
request_counters = {}
# Custom RateLimiter class with dynamic rate limiting values per route
class RateLimiter:
def __init__(self, requests_limit: int, time_window: int):
self.requests_limit = requests_limit
self.time_window = time_window
async def __call__(self, request: Request):
client_ip = request.client.host
route_path = request.url.path
# Get the current timestamp
current_time = int(time.time())
# Create a unique key based on client IP and route path
key = f"{client_ip}:{route_path}"
# Check if client's request counter exists
if key not in request_counters:
request_counters[key] = {"timestamp": current_time, "count": 1}
else:
# Check if the time window has elapsed, reset the counter if needed
if current_time - request_counters[key]["timestamp"] > self.time_window:
# Reset the counter and update the timestamp
request_counters[key]["timestamp"] = current_time
request_counters[key]["count"] = 1
else:
# Check if the client has exceeded the request limit
if request_counters[key]["count"] >= self.requests_limit:
raise HTTPException(status_code=429, detail="Too Many Requests")
else:
request_counters[key]["count"] += 1
# Clean up expired client data (optional)
for k in list(request_counters.keys()):
if current_time - request_counters[k]["timestamp"] > self.time_window:
request_counters.pop(k)
return True
# Include the custom RateLimiter dependency on specific routes
@app.get("/limited", dependencies=[Depends(RateLimiter(requests_limit=10, time_window=60))])
async def limited_endpoint():
return {"message": "This endpoint has rate limiting (10 requests per 60 seconds)."}
@app.get("/limited/other", dependencies=[Depends(RateLimiter(requests_limit=5, time_window=60))])
async def limited_other_endpoint():
return {"message": "This endpoint has rate limiting (5 requests per 60 seconds)."}
@app.get("/unlimited")
async def unlimited_endpoint():
return {"message": "This endpoint has no rate limiting."}

我们使用CCD_ 17方法创建了一个自定义RateLimiter类,以接受自定义速率限制值CCD_。该类实现了__call__方法,该方法允许将类实例用作可调用的依赖项。