跨多个请求存储大型数据帧的Flask



我有一个Flask web应用程序,它使用了一个大型DataFrame(数百兆(。DataFrame在应用程序中用于几种不同的机器学习模型。我想在应用程序中只创建一次DataFrame,并在多个请求中使用它,这样用户就可以基于相同的数据构建不同的模型。Flask会话不是为大数据构建的,因此这不是一个选项。我不想回去重新创建DataFrame,以防数据源是csv文件(是的(。是

我有一个有效的解决方案,但在堆栈溢出中找不到任何关于这个解决方案的讨论。这让我怀疑我的解决方案可能不是一个好的设计想法。我一直认为,在软件开发中,一条成功的道路就是一条精心选择的道路。

我的解决方案只是创建一个带有一个类变量的数据持有者类:

class DataHolder:
dataFrameHolder = None

现在,dataFrameHolder在所有类实例中都是已知的(就像Java中的静态变量一样(,因为它存储在服务器的内存中。

我现在可以创建一次DataFrame,将其放入DataHolder类:

import pandas as pd
from dataholder import DataHolder
result_set = pd.read_sql_query(some_SQL, connection)
df = pd.DataFrame(result_set, columns=['col1', 'col2',....]
DataHolder.dataFrameHolder = df

然后从导入DataHolder类的任何代码中访问该DataFrame。然后,我可以在应用程序的任何地方使用存储的DataFrame,包括跨不同的请求:

.
.
modelDataFrame = DataHolder.dataFrameHolder
do_some_model(modelDataFrame)
.
.

这是一个坏主意,一个好主意,还是还有其他我不知道的东西已经解决了问题?

可以使用

Redis。我的用例是较小的数据帧,所以没有用较大的数据帧进行测试。这使我能够向多个浏览器客户端提供3秒的滴答数据。pyarrow串行化/去串行化运行良好。在本地和AWS/GCloud和Azure 之间工作

GET路线

@app.route('/cacheget/<path:key>', methods=['GET'])
def cacheget(key):
c = mycache()
data = c.redis().get(key)
resp = Response(BytesIO(data), mimetype="application/octet-stream", direct_passthrough=True)
resp.headers["key"] = key
resp.headers["type"] = c.redis().get(f"{key}.type")
resp.headers["size"] = sys.getsizeof(data)
resp.headers["redissize"] = sys.getsizeof(c.redis().get(key))
return resp

将数据帧放入缓存的示例路由

@app.route('/sensor_data', methods=['POST'])
def sensor_data() -> str:
c = mycache()
dfsensor = c.get("dfsensor")
newsensor = json_normalize(request.get_json())
newsensor[["x","y"]] = newsensor[["epoch", "value"]]
newsensor["xy"] = newsensor[['x', 'y']].agg(pd.Series.to_dict, axis=1)
newsensor["amin"] = newsensor["value"]
newsensor["amax"] = newsensor["value"]
newsensor = newsensor.drop(columns=["x","y"])
# add new data from serial interface to start of list (append old data to new data).
# default time as now to new data
dfsensor = newsensor.append(dfsensor, sort=False)
# keep size down - only last 500 observations
c.set("dfsensor", dfsensor[:500])
del dfsensor
return jsonify(result={"status":"ok"})

效用类别

import pandas as pd
import pyarrow as pa, os
import redis,json, os, pickle
import ebutils
from logenv import logenv
from pandas.core.frame import DataFrame
from redis.client import Redis
from typing import (Union, Optional)
class mycache():
__redisClient:Redis
CONFIGKEY = "cacheconfig"
def __init__(self) -> None:
try:
ep = os.environ["REDIS_HOST"]
except KeyError:
if os.environ["HOST_ENV"] == "GCLOUD":
os.environ["REDIS_HOST"] = "redis://10.0.0.3"
elif os.environ["HOST_ENV"] == "EB":
os.environ["REDIS_HOST"] = "redis://" + ebutils.get_redis_endpoint()
elif os.environ["HOST_ENV"] == "AZURE":
#os.environ["REDIS_HOST"] = "redis://ignore:password@redis-sensorvenv.redis.cache.windows.net"
pass # should be set in azure env variable
elif os.environ["HOST_ENV"] == "LOCAL":
os.environ["REDIS_HOST"] = "redis://127.0.0.1"
else:
raise "could not initialise redis"
return # no known redis setup
#self.__redisClient = redis.Redis(host=os.environ["REDIS_HOST"])
self.__redisClient = redis.Redis.from_url(os.environ["REDIS_HOST"])
self.__redisClient.ping()
# get config as well...
self.config = self.get(self.CONFIGKEY)
if self.config is None:
self.config = {"pyarrow":True, "pickle":False}
self.set(self.CONFIGKEY, self.config)
self.alog = logenv.alog()
def redis(self) -> Redis:
return self.__redisClient

def exists(self, key:str) -> bool:
if self.__redisClient is None:
return False
return self.__redisClient.exists(key) == 1
def get(self, key:str) -> Union[DataFrame, str]:
keytype = "{k}.type".format(k=key)
valuetype = self.__redisClient.get(keytype)
if valuetype is None:
if (key.split(".")[-1] == "pickle"):
return pickle.loads(self.redis().get(key))
else:
ret = self.redis().get(key)
if ret is None:
return ret
else:
return ret.decode()
elif valuetype.decode() == str(pd.DataFrame):
# fallback to pickle serialized form if pyarrow fails
# https://issues.apache.org/jira/browse/ARROW-7961
try:
return pa.deserialize(self.__redisClient.get(key))
except pa.lib.ArrowIOError as err:
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
except OSError as err:
if "Expected IPC" in str(err):
self.alog.warning("using pickle from cache %s - %s - %s", key, pa.__version__, str(err))
return pickle.loads(self.redis().get(f"{key}.pickle"))
else:
raise err
elif valuetype.decode() == str(type({})):
return json.loads(self.__redisClient.get(key).decode())
else:
return self.__redisClient.get(key).decode() # type: ignore
def set(self, key:str, value:Union[DataFrame, str]) -> None:
if self.__redisClient is None:
return
keytype = "{k}.type".format(k=key)
if str(type(value)) == str(pd.DataFrame):
self.__redisClient.set(key, pa.serialize(value).to_buffer().to_pybytes())
if self.config["pickle"]:
self.redis().set(f"{key}.pickle", pickle.dumps(value))
# issue should be transient through an upgrade....
# once switched off data can go away
self.redis().expire(f"{key}.pickle", 60*60*24)
elif str(type(value)) == str(type({})):
self.__redisClient.set(key, json.dumps(value))
else:
self.__redisClient.set(key, value)
self.__redisClient.set(keytype, str(type(value)))

if __name__ == '__main__':
os.environ["HOST_ENV"] = "LOCAL"
r = mycache()
rr = r.redis()
for k in rr.keys("cache*"):
print(k.decode(), rr.ttl(k))
print(rr.get(k.decode()))

我也遇到了类似的问题,因为我正在导入CSV(100 MB(,并为每个请求动态创建DataFrames,正如您所说,这很恶心!我还尝试了REDIS方式来缓存它,并在一段时间内提高了性能,直到我意识到对底层数据进行更改也意味着更新缓存。

然后我发现了一个超越CSV的世界,还有像Pickle、Feather、Parquet等更高性能的文件格式。你可以在这里阅读更多关于他们的信息。您可以根据需要导入/导出CSV,但要使用中间格式进行处理。

不过我确实遇到了一些问题。我读到Pickle存在安全问题,尽管我仍然在使用它。Feather不允许我在数据中写入一些object类型,它需要它们categorized。你的里程数可能会有所不同,但如果你有良好的干净数据,请使用Feather。

最近,我发现我使用Datatable而不是Pandas来管理大数据,并将它们存储在Jay中,以获得更好的读/写性能。

然而,这意味着将使用Pandas的代码重新写入DataTable,但我相信API非常相似。我自己还没有做过,因为代码库很大,但你可以尝试一下。

相关内容

  • 没有找到相关文章

最新更新