我正在尝试在一个非常大的PostgreSQL表中的列上运行mean((函数。由于我无法在内存中加载列,因此我选择了分布式 Dask 的并行性和分区。
系统配置:
12 cores / 24 threads
64 GB RAM
SSD
我尝试 (1( 使用 24 个进程最大化线程数 (1( 和 (2( 我尝试通过每个进程 24 个线程最大化进程数 (1(。
问题是,无论哪种情况,从延迟任务都需要>= 24 秒。此任务主要由用于设置 SQL 连接的 sqlalchemy 组件组成。
从延迟任务的较长执行时间似乎与 sql 查询有关:我的索引列是数据类型 bigint,但 Dask 生成基于 float (1000.01( 的 where 条件。这是整个操作中的巨大成本。尽管 Dask 数据帧指示 dtype int64,但数据本身实际上是浮点数。
read_sql_table:
data = dd.read_sql_table("<table>",'postgresql+psycopg2://<user>:<pw>@<ip>:<port>/<db>',index_col='<int_col>',bytes_per_chunk=1e6 - 1e9)
我期待:
select * from <table> where <int_col> >= int and <int_col> < int
但达斯克是这样做的:
select * from <table> where <int_col> >= float and <int_col> < float
为什么 Dask 不在 SQL 查询中应用正确的索引数据类型?
https://github.com/dask/dask/blob/master/dask/dataframe/io/sql.py
解决方案,sql.py 中的起始线 186:
parts = []
lowers, uppers = divisions[:-1], divisions[1:]
for i, (lower, upper) in enumerate(zip(lowers, uppers)):
if dtype.kind == "i":
upper = int(np.ceil(upper))
lower = int(np.ceil(lower))
cond = index <= upper if i == len(lowers) - 1 else index < upper
q = sql.select(columns).where(sql.and_(index >= lower, cond)).select_from(table)
parts.append(delayed(_read_sql_chunk)(q, uri, meta, **kwargs))