避免同时读取一个dask数组的多个文件



从一个库中,我得到一个读取文件并返回numpy数组的函数。

我想用多个文件中的多个块构建一个Dask数组。

每个块都是对文件调用函数的结果。

当我要求Dask进行计算时,Dask会要求函数同时从硬盘读取多个文件吗?

如果是这样的话,如何避免这种情况?我的电脑没有并行文件系统。

示例:

import numpy as np
import dask.array as da
import dask
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy', x)
np.save('1.npy', x)
# np.load is a function that reads a file 
# and returns a numpy array.
# Build delayed
y = [dask.delayed(np.load)('%d.npy' % i)
for i in range(2)]
# Build individual Dask arrays.
# I can get the shape of each numpy array without 
# reading the whole file.
z = [da.from_delayed(a, (n, m), np.int) for a in y]
# Combine the dask arrays
w = da.vstack(z)
print(w.compute())

您可以使用lock原语-这样加载程序函数就可以获得读取释放。

read_lock = distributed.Lock('numpy-read')
@dask.delayed
def load_numpy(lock, fn):
lock.acquire()
out = np.load(fn)
lock.release()
return out
y = [load_numpy(lock, '%d.npy' % i) for i in range(2)]

此外,da.from_array接受一个锁,因此您可以从直接提供锁的延迟函数np.load创建单独的数组。

或者,您可以指定一个单位资源分配给worker(具有多个线程(,然后按照每个文件读取任务一个单元的要求进行计算(或持久化(,如链接文档中的示例所示。

回复评论:问题中没有提到to_hdf,我不知道为什么现在会被质疑;但是,您可以将da.store(compute=False)h5py.File一起使用,然后指定调用compute时要使用的资源。请注意,这不会使数据进入内存。

最新更新