我正在研究一个实时信息检索系统,该系统在本地SQL Server数据库中执行大约450万行的大型查询。
每个查询平均返回400.000行,并进行参数化,如下例所示:
SELECT Id, Features, Edges, Cluster, Objects FROM db.Image
WHERE Cluster = 16
AND Features IS NOT NULL
AND Objects IS NOT NULL
这些是我使用当前方法的时间:
Query time: 4.52361000 seconds
Query size: 394048 rows, 5 columns
虽然不一定不可用,但预计查询大小将快速增长,因此我需要一种更有效的方法来将大量行读取到DataFrame中。
目前,我使用pyodbc
建立一个连接到SQL Server和pd.read_sql
解析查询直接成一个DataFrame,然后被操纵。我正在寻找显着改善查询时间的方法,同时仍然允许我在获取数据后使用DataFrame操作。到目前为止,我已经尝试了dask
DataFrames, connectorX,以及与multithreading
并行化查询的失败尝试,但无济于事。
依靠其他解决方案,多线程,甚至完全不同的文件格式,如何改善读取这么多数据所需的时间?
代码示例
conn = connection() # I have a function that returns a connector
filter = 16
command = '''SELECT Id, Features, Edges, Cluster, Objects FROM Common.Image
WHERE Cluster = {} AND Features IS NOT NULL AND Objects IS NOT NULL'''.format(filter)
result = pd.read_sql(command, conn)
编辑
@tadman评论后:
如果可行的话考虑缓存,比如一旦你获取了数据,你可以把它保存在一个更紧凑的形式(Google Protobuf, Parquet等),以这种方式读取可以相当快,因为你通常只是IO绑定,而不是服务器/CPU绑定。
我看了看Parquet缓存,并找到了一种相当快的获取数据的方法:
- 为我的每个数据集群(1到21)创建压缩
parquet
文件。 - 使用
pyarrow
,读取所需的集群文件df_pq = pq.read_table("\cluster16.parquet")
- 使用
df = df_pq.to_pandas()
将parquet文件转换为pandas DataFrame - 照常进行
使用这种方法,我将总时间减少到1.12400
秒。
Pyspark dataframe比pandas dataframe运行得更快,应该提供更多的内存。如果已经有了pandas格式的数据框,可以这样进行转换:
spark_df = spark.createDataFrame(df)
modified_df=spark_df.filter("query here").collect()
如果需要,可以在主sql查询后转换回pandas。
链接:https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html