限制Dask在同步计算过程中使用的内存



我正试图使用Dask来处理一个大于内存的数据集,该数据集存储在保存为NumPy文件的块中。我懒洋洋地加载数据:

array = da.concatenate([
da.from_delayed(
dask.delayed(np.load)(path),
shape=(size, window_len, vocab_size),
dtype=np.float32
)
for path, size in zip(shard_paths, shard_sizes)
])

然后我使用da.map_blocks:对文件进行一些处理

da.map_blocks(fn, array, drop_axis=[-1]).compute()

当我运行这个程序时,我的进程会被杀死,可能是因为内存使用率高(不仅数据大于内存,而且每个进程都有内存限制(。

我可以通过顺序处理块来轻松地限制内存,但这样我就不会从Dask提供的并行性中受益。

如何限制Dask使用的内存(例如,一次只加载一定数量的块(,同时尽可能多地并行处理块?

可以使用resource模块限制Unix上进程使用的内存:

import resource
resource.setrlimit(resource.RLIMIT_AS, (max_memory, max_memory))

一旦达到这个极限,Dask似乎能够减少内存使用量。

然而,进程仍然可能在延迟的np.load上崩溃,因此这并不一定能解决问题。

最新更新