我正试图使用Dask来处理一个大于内存的数据集,该数据集存储在保存为NumPy文件的块中。我懒洋洋地加载数据:
array = da.concatenate([
da.from_delayed(
dask.delayed(np.load)(path),
shape=(size, window_len, vocab_size),
dtype=np.float32
)
for path, size in zip(shard_paths, shard_sizes)
])
然后我使用da.map_blocks
:对文件进行一些处理
da.map_blocks(fn, array, drop_axis=[-1]).compute()
当我运行这个程序时,我的进程会被杀死,可能是因为内存使用率高(不仅数据大于内存,而且每个进程都有内存限制(。
我可以通过顺序处理块来轻松地限制内存,但这样我就不会从Dask提供的并行性中受益。
如何限制Dask使用的内存(例如,一次只加载一定数量的块(,同时尽可能多地并行处理块?
可以使用resource
模块限制Unix上进程使用的内存:
import resource
resource.setrlimit(resource.RLIMIT_AS, (max_memory, max_memory))
一旦达到这个极限,Dask似乎能够减少内存使用量。
然而,进程仍然可能在延迟的np.load
上崩溃,因此这并不一定能解决问题。