当使用当前版本的dask('0.7.5',github:[a1](时,由于数据量很大,我能够通过dask.dataframe api执行分区计算。但是对于在 bcolz 中存储为记录的大型数据帧('0.12.1',github:[a2](,我在执行此操作时遇到了索引错误:
import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()
错误为(缩写回溯输出(:
# ...
File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds
实际上,错误仅在执行dd.concat操作时才存在。
out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()
正在工作。
但是,当在内存中读取部分数据时,此错误在某些情况下也存在,至少对于分区长度(npartition(>1和特定数据大小而言。
ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
npartitions=10)
查看完整的测试代码_test_dask_error.py,以及带有回溯的完整输出_test_out.txt。
实际上,在那一步中,我停止了调查,因为我不知道如何 async.py 根本原因调试此错误。当然,我会将此报告为错误(如果没有提示用户/使用错误(。但是:如何进行调试以找到根本原因?
_[a1]: _https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5
_[a2]: _https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a
自 dask 文档的常见问题解答
问:使用 dask 时如何调试程序?
如果你想深入研究 Python 调试器,一个常见的原因挫败感是异步调度程序,因为它们运行您的不同工作线程上的代码,无法提供对 Python 的访问调试器。 幸运的是,您可以更改为同步调度程序,例如 通过提供get=
关键字dask.get
或dask.async.get_sync
到compute
方法:
my_array.compute(get=dask.async.get_sync)
dask.async.get_sync
和dask.get
都将提供回溯遍历。 dask.async.get_sync
使用与异步相同的机制调度程序,但只有一个工作线程。 dask.get
很简单,但确实如此不缓存数据,因此对于某些工作负载来说可能会很慢。
评论
我很想知道问题是什么。 如果使用上述方法后原因不是很明显,那么我建议在 dask 问题跟踪器上提出问题。
使用后
df_dd_mem_b.compute(get=dask.async.get_sync)
很明显,错误
#...
File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 1637, in _loc
result = df.loc[start:stop]
发生了,因为_loc
没有给出确切的边界,但stop
是越界的
df_dd_out_mem_b.divisions
外:(0, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 9999999(
并在中断的任务中被调用(例如(
df.loc[1000000:2000000]
尽管最后一个索引标签是1999999。
问题是,对于pandas.DataFrame.loc
给出:"允许的输入是:[...]带有标签"a":'f'的切片对象,(请注意,与通常的python切片相反,开始和停止都包括在内!(摘自文档稳定版 0.17.1(。显然对于小数字,没有引发越界错误,但对于大数字(i>~1E6(,我通过此测试得到了 IndexError:
df = pd.DataFrame({0: range(i)}).loc[0:i]
用pd。DataFrame.iloc 根据文档,这种不确定的行为似乎确实不是问题:"如果请求的索引器越界,.iloc 将引发 IndexError,但允许越界索引的切片索引器除外。
df = pd.DataFrame({0: range(i)}).iloc[0:i]
对于给定的 dask 问题,它肯定不是正确的修复,因为_loc
写得更通用,但最终仅适用于本质上是特定调用的特定调用
result = df.loc[slice(*df.index[[0, -1]])]