FastAPI 在返回大量 JSON 数据方面非常慢



我有一个返回大量JSON数据的FastAPIGET端点(~160,000行和45列)。不出所料,使用json.dumps()返回数据非常慢。我首先使用json.loads()从文件中读取数据,并根据输入的参数对其进行过滤。有没有比使用return data更快的方法将数据返回给用户?在当前状态下需要将近一分钟的时间。

我的代码目前如下所示:

# helper function to parse parquet file (where data is stored)
def parse_parquet(file_path):
df = pd.read_parquet(file_path)
result = df.to_json(orient = 'records')
parsed = json.loads(result)
return parsed

@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return data

响应速度如此慢的原因之一是,在您的parse_parquet()方法中,您首先将文件转换为 JSON(使用df.to_json()),然后转换为字典(使用json.loads()),最后再次转换为 JSON,因为 FastAPI 在幕后使用jsonable_encoder自动将返回的值转换为与 JSON 兼容的数据,然后使用 Python 标准json.dumps()来序列化对象——这个过程非常慢(见这里)回答了解更多详情)。

正如@MatsLindh在评论部分所建议的那样,您可以使用替代的 JSON 编码器,例如 orjson 或 ujosn(也请参阅此答案),与让 FastAPI 使用jsonable_encoder然后使用标准json.dumps()将数据转换为 JSON 相比,这确实会加快该过程。但是,使用 pandasto_json()并直接重新设置自定义Response(如本答案的选项1(更新 2)中所述)似乎是性能最佳的解决方案。您可以使用下面给出的代码(使用自定义APIRoute类)来比较所有可用解决方案的响应时间。

使用您自己的镶木地板文件或下面的代码创建由 160K 行和 45 列组成的示例镶木地板文件。

create_parquet.py

import pandas as pd
import numpy as np
columns = ['C' + str(i) for i in range(1, 46)]
df = pd.DataFrame(data=np.random.randint(99999, 99999999, size=(160000,45)),columns=columns)
df.to_parquet('data.parquet')

运行下面的 FastAPI 应用并分别访问每个终结点,以检查完成将数据加载和转换为 JSON 的过程所需的时间。

app.py

from fastapi import FastAPI, APIRouter, Response, Request
from fastapi.routing import APIRoute
from typing import Callable
import pandas as pd
import json
import time
import ujson
import orjson

class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["Response-Time"] = str(duration)
print(f"route duration: {duration}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@router.get("/defaultFastAPIencoder")
def get_data_default():
df = pd.read_parquet('data.parquet')   
return df.to_dict(orient="records")

@router.get("/orjson")
def get_data_orjson():
df = pd.read_parquet('data.parquet')
return Response(orjson.dumps(df.to_dict(orient='records')), media_type="application/json")
@router.get("/ujson")
def get_data_ujson():
df = pd.read_parquet('data.parquet')   
return Response(ujson.dumps(df.to_dict(orient='records')), media_type="application/json")
# Preferred way  
@router.get("/pandasJSON")
def get_data_pandasJSON():
df = pd.read_parquet('data.parquet')   
return Response(df.to_json(orient="records"), media_type="application/json")  
app.include_router(router)

尽管使用上述/pandasJSON响应时间非常快(这应该是首选方式),但在浏览器上显示数据时可能会遇到一些延迟。但是,这与服务器端无关,而是与客户端有关,因为浏览器正在尝试显示大量数据。如果您不想显示数据,而是让用户将数据下载到他们的设备(这会快得多),则可以使用attachment参数并在Response中设置Content-Disposition标头并传递filename,向浏览器指示应下载文件。有关更多详细信息,请查看此答案和此答案。

@router.get("/download")
def get_data():
df = pd.read_parquet('data.parquet')
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')

我还应该提到,有一个库,叫做Dask,它可以处理大型数据集,如此处所述,以防您必须处理大量需要很长时间才能完成的记录。与 Pandas 类似,您可以使用.read_parquet()方法来读取文件。由于 Dask 似乎没有提供等效的.to_json()方法,您可以使用df.compute()将 Dask 数据帧转换为 Pandas DataFrame,然后使用 Pandasdf.to_json()将数据帧转换为 JSON 字符串,并返回它,如上所示。

我还建议您查看此答案,它提供了有关流式传输/返回数据帧的详细信息和解决方案,以防您正在处理大量数据,将它们转换为 JSON(使用.to_json())或 CSV(使用.to_csv())可能会导致服务器端出现内存问题,如果您选择将输出字符串(JSON 或 CSV)存储到 RAM 中(这是默认行为, 如果不将路径参数传递给上述函数),因为已经为原始数据帧分配了大量内存。

我想json.loads(result)会在您的情况下返回字典数据类型,并且您正在过滤字典数据类型。您可以将字典数据类型作为 JSON 发送,如下所示:

from fastapi.responses import JSONResponse
@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
if year is None:
data = parse_parquet(f'path/{year}_data.parquet')
# no year
if year is not None:
data = parse_parquet(f'path/all_data.parquet')
if id is not None:
data = [d for d in data if d['id'] == id]
return JSONResponse(content=json_compatible_item_data)

相关内容

最新更新