为什么 dask worker 会因为大小"small"任务上的内存错误而失败?[达斯克袋]



我正在多个图像上运行管道。管道包括从文件系统读取图像,对每个图像进行处理,然后将图像保存到文件系统。但是,dask 工作线程由于内存错误而失败。 有没有办法确保 dask 工作人员不会在内存中加载太多图像?即等到工作线程上有足够的空间,然后再在新映像上启动处理管道。

我有一个调度程序和 40 个工作线程,有 4 个核心、15GB 内存并运行 Centos7。我正在尝试批量处理 125 张图像;每个图像都相当大,但足够小,可以容纳工人;整个过程大约需要3GB。

我尝试处理少量的图像,效果很好。

编辑

from dask.distributed import Client, LocalCluster
# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))
paths = ['list', 'of', 'paths']
# Read the file data from each path
data = client.map(read, path, resources={'process': 1)
# Apply foo to the data n times
for _ in range(n):
data = client.map(foo, x, resources={'process': 1)
# Save the processed data
data.map(save, x, resources={'process': 1)
# Retrieve results
client.gather(data)

我希望图像是处理的,因为工人上有可用的空间,但似乎所有图像都同时加载到不同的工人身上。

编辑: 我的问题是所有任务都分配给工作人员,他们没有足够的内存。我发现了如何限制工人在单个时刻处理的任务数量[https://distributed.readthedocs.io/en/latest/resources.html#resources 分别应用于每个工人进程](请参阅此处)。 但是,有了这个限制,当我执行任务时,它们都完成了读取步骤,然后是进程步骤,最后是保存步骤。这是一个问题,因为映像会溢出到磁盘。

有没有办法在开始新任务之前完成每个任务? 例如,在worker-1上:读取(img1)->进程(img1)->save(img1)->read(img2)->...

Dask 通常不知道任务需要多少内存,它只能知道输出的大小,并且只有在它们完成之后才能知道。这是因为 Dask 只是执行一个 pthon 函数,然后等待它完成;但是所有OSRT的事情都可以在Python函数中发生。您通常应该期望开始与可用的工作线程核心一样多的任务 - 正如您所发现的那样。

如果你想要更小的总内存负载,那么你的解决方案应该很简单:有足够少的辅助角色,这样,如果所有工作线程都使用你可以预期的最大内存,你仍然有一些多余的系统来应对。

编辑:您可能想尝试在提交之前在图形上运行优化(尽管我认为无论如何都应该发生这种情况),因为听起来您的线性任务链应该"融合"。 http://docs.dask.org/en/latest/optimize.html

相关内容

最新更新