Dask + PostgreSQL read_sql_table:数据类型不正确index_col



我正在尝试在一个非常大的PostgreSQL表中的列上运行mean((函数。由于我无法在内存中加载列,因此我选择了分布式 Dask 的并行性和分区。

系统配置:

12 cores / 24 threads
64 GB RAM
SSD

我尝试 (1( 使用 24 个进程最大化线程数 (1( 和 (2( 我尝试通过每个进程 24 个线程最大化进程数 (1(。

问题是,无论哪种情况,从延迟任务都需要>= 24 秒。此任务主要由用于设置 SQL 连接的 sqlalchemy 组件组成。

从延迟任务的较长执行时间似乎与 sql 查询有关:我的索引列是数据类型 bigint,但 Dask 生成基于 float (1000.01( 的 where 条件。这是整个操作中的巨大成本。尽管 Dask 数据帧指示 dtype int64,但数据本身实际上是浮点数。

read_sql_table:

data = dd.read_sql_table("<table>",'postgresql+psycopg2://<user>:<pw>@<ip>:<port>/<db>',index_col='<int_col>',bytes_per_chunk=1e6 - 1e9)

我期待:

select * from <table> where <int_col> >= int and <int_col> < int

但达斯克是这样做的:

select * from <table> where <int_col> >= float and <int_col> < float

为什么 Dask 不在 SQL 查询中应用正确的索引数据类型?

https://github.com/dask/dask/blob/master/dask/dataframe/io/sql.py

解决方案,sql.py 中的起始线 186:

parts = []
lowers, uppers = divisions[:-1], divisions[1:]
for i, (lower, upper) in enumerate(zip(lowers, uppers)):
if dtype.kind == "i":
upper = int(np.ceil(upper))
lower = int(np.ceil(lower))
cond = index <= upper if i == len(lowers) - 1 else index < upper
q = sql.select(columns).where(sql.and_(index >= lower, cond)).select_from(table)
parts.append(delayed(_read_sql_chunk)(q, uri, meta, **kwargs))

最新更新