如何在分布式Dask上运行SQLAlchemy查询



我正在尝试使用我设置的dask集群运行并并行化这个sqlalchemy查询,因为我没有足够的内存从本地计算机执行它。

我的代码如下-我不确定这是否是实现这一目标的最佳方式:

from dask.distributed import Client
import dask.dataframe as dd
from dask.delayed import delayed
client = Client(<IP Address>)
recent_dates = ['2020-04-24', '2020-04-23', 2020-04-22']
query = """SELECT * FROM table WHERE date = '%s'"""
queries = [query.format(d) for d in recent_dates]
from sqlalchemy.engine import create_engine
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
connect_args={'protocol': 'https',
'requests_kwargs': {'verify': key}})
con = engine.connect()
df = dd.from_delayed([delayed(pd.read_sql_query)(q, conn) for q in queries])

我得到以下错误:

TypeError: can't pickle _thread.RLock objects

您应该使用函数read_sql_table,它正是为此目的而创建的。如果您阅读了文档字符串和/或代码,您会发现正是查询本身传递给工作人员,这些工作人员在本地创建自己的引擎实例。这是因为sqlalchemy实例具有无法在工作线程之间发送的状态,正如您所发现的那样。

注意,read_sql_table还关心对数据进行分区,因为这是Dask,重点是处理大于内存的数据。在您的示例中,我猜索引/分区列是date,并且您希望传递要显式拆分的"divisions"。

最新更新