为什么 dask.dataframe compute() 结果在特定情况下给出 IndexError?如何查找异步错误的



当使用当前版本的dask('0.7.5',github:[a1](时,由于数据量很大,我能够通过dask.dataframe api执行分区计算。但是对于在 bcolz 中存储为记录的大型数据帧('0.12.1',github:[a2](,我在执行此操作时遇到了索引错误:

import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()

错误为(缩写回溯输出(:

# ...      
    File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
        raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds

实际上,错误仅在执行dd.concat操作时才存在。

out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()

正在工作。

但是,当在内存中读取部分数据时,此错误在某些情况下也存在,至少对于分区长度(npartition(>1和特定数据大小而言。

ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
                             npartitions=10)

查看完整的测试代码_test_dask_error.py,以及带有回溯的完整输出_test_out.txt。

实际上,在那一步中,我停止了调查,因为我不知道如何 async.py 根本原因调试此错误。当然,我会将此报告为错误(如果没有提示用户/使用错误(。但是:如何进行调试以找到根本原因?

_[a1]: _https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5

_[a2]: _https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a

自 dask 文档的常见问题解答

问:使用 dask 时如何调试程序?

如果你想深入研究 Python 调试器,一个常见的原因挫败感是异步调度程序,因为它们运行您的不同工作线程上的代码,无法提供对 Python 的访问调试器。 幸运的是,您可以更改为同步调度程序,例如 通过提供get=关键字dask.getdask.async.get_synccompute方法:

my_array.compute(get=dask.async.get_sync)

dask.async.get_syncdask.get都将提供回溯遍历。 dask.async.get_sync使用与异步相同的机制调度程序,但只有一个工作线程。 dask.get很简单,但确实如此不缓存数据,因此对于某些工作负载来说可能会很慢。

评论

我很想知道问题是什么。 如果使用上述方法后原因不是很明显,那么我建议在 dask 问题跟踪器上提出问题。

使用后

df_dd_mem_b.compute(get=dask.async.get_sync)

很明显,错误

#...
    File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 1637, in _loc
        result = df.loc[start:stop]

发生了,因为_loc没有给出确切的边界,但stop是越界的

df_dd_out_mem_b.divisions

外:(0, 1000000, 2000000, 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 9999999(

并在中断的任务中被调用(例如(

df.loc[1000000:2000000]

尽管最后一个索引标签是1999999。

问题是,对于pandas.DataFrame.loc给出:"允许的输入是:[...]带有标签"a":'f'的切片对象,(请注意,与通常的python切片相反,开始和停止都包括在内!(摘自文档稳定版 0.17.1(。显然对于小数字,没有引发越界错误,但对于大数字(i>~1E6(,我通过此测试得到了 IndexError:

df = pd.DataFrame({0: range(i)}).loc[0:i]

用pd。DataFrame.iloc 根据文档,这种不确定的行为似乎确实不是问题:"如果请求的索引器越界,.iloc 将引发 IndexError,但允许越界索引的切片索引器除外。

df = pd.DataFrame({0: range(i)}).iloc[0:i]

对于给定的 dask 问题,它肯定不是正确的修复,因为_loc写得更通用,但最终仅适用于本质上是特定调用的特定调用

result = df.loc[slice(*df.index[[0, -1]])]

最新更新