使用 dask-ec2 脚本设置集群后,我尝试将 sql 表作为 dask 数据帧读取。我的查询如下所示:
import dask.dataframe as dd
from dask.distributed import Client, progress
c = Client('127.0.0.1:8786')
df = dd.read_sql_table(sql_table_name, uri, index_col=column_1,
columns=[column_2, column_3, column_4], npartitions=393, parse_dates=
[date_column_1, date_column_2])
其中 sql_table_name 和 uri 是特定于我的数据库的字符串。然后,我尝试使用 dask.distributed 客户端来持久化 df:
df = c.persist(df)
progress(df)
进度条在 0.6 秒处显示异常。我使用 c.get_futures_error(df( 来尝试理解原因并返回如下内容:
(<function distributed.worker.execute_task>,
((<function dask.compatibility.apply>,
<function pandas.io.sql.read_sql>,
[<sqlalchemy.sql.selectable.Select at 0x7fd5e3b0a4e0; Select object>,
*the uri I used*],
(dict,
[['parse_dates',
[date_column_1, date_column_2]],
['index_col', column_1]])),),
{},
[])
如果能提供任何关于为什么我可能会遇到这些期货错误以及我可以做些什么来减轻这些错误的指导,我将不胜感激。
我相信你正在寻找Client.recreate_error_locally
>>> future = c.submit(div, 1, 0)
>>> future.status
'error'
>>> c.recreate_error_locally(future)
ZeroDivisionError: division by zero