Dask:在过滤过程中内存不足(MRE)



tl;dr

我想根据列的值过滤Dask数据帧,即

data.loc[data[column].lt(value)].to_parquet(path)

但是,尽管每个分区都比可用内存小20倍,我还是用光了内存。

示例数据

让我们首先创建一些样本数据来使用

import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame(np.random.uniform(size=int(5e8)), columns=['val'])
ddf = dd.from_pandas(df, npartitions=800)  # each partition has about 10Mb in memory
ddf.to_parquet('sample.parq')

解决方案尝试

让我们假设我的机器只有512Mb的内存。当然,我的问题规模要大得多(在我的情况下是TB(,但这个简单的问题似乎与我在更大的数据集中遇到的问题相同。

因此,我将使用两个具有200Mb的工作人员,每个

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, memory_limit='200Mb')
client = Client(cluster)
data = dd.read_parquet('sample.parq')
task = data.loc[data['val'].lt(0.5)].to_parquet('sample-output.parq', compute=False)

由于每个分区占用10Mb的内存,根据Wes Kinney的经验法则,我可能最多需要50-100Mb的内存来处理一个分区,所以200Mb应该绰绰有余。

然而,当我运行task.compute()时,工作人员几乎立即耗尽内存,重新启动,然后完全停止。

我尝试过的东西

限制资源

我还试图限制员工资源。据我所知,这应该让工人知道,一次只能处理一项任务。也许这太保守了,但在这种情况下,我预计会发生死锁,而不是内存耗尽。

cluster = LocalCluster(n_workers=2, memory_limit='200Mb', resources={'m': 200})
task.compute(resources={'m': 200})

然而,遗憾的是,结果是相同的

剖析阅读拼花地板的记忆使用情况

SultanOrazbayev建议我应该使用memory_profiler来查看加载单个分区时使用了多少内存,因为read_parquet的使用可能是罪魁祸首。

我写了test-load.py

import pandas as pd
@profile
def load_parq():
return pd.read_parquet('sample.parq/part.0.parquet')
if __name__ == '__main__':
df = load_parq()
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')

并使用python3 -m memory_profiler test-load.py运行它。这是输出:

Memory footprint: 10.0MB
Filename: test-load.py
Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
3   74.750 MiB   74.750 MiB           1   @profile
4                                         def load_parq():
5  132.992 MiB   58.242 MiB           1       return pd.read_parquet('sample.parq/part.0.parquet')

而且——公平地说——即使是读取一个文件也需要比我想象的更多的内存。200MB也许还不够,但多少钱呢?

在我的设置中,两个工人的答案都是4GB。这实际上等于整个数据集。事实上,从仪表板上看,dask似乎很乐意同时加载几十个分区。如果它有4GB的内存,那也没关系,但如果它没有那么多内存,我该怎么办?

问题出现在处理的早期,即读取数据的过程中。如果您在Jupyter Lab中使用memory profiler(对于Python脚本,使用pip install memory_profiler(,那么您将看到,简单地加载带有panda的文件所使用的内存是文件大小的倍数。在我使用csv和parquet文件的实验中,内存乘数大约是底层文件大小的3到10倍(我使用的是pandas version 1.2.3(。

谷歌搜索显示pd.read_csvpd.read_parquet的高内存使用率是一个反复出现的问题。。。因此,除非你能找到一种高效的内存加载数据的方法,否则必须给工作人员更多的内存(或者就文件大小而言,负载要小得多(。请注意,这是在任何dask操作之前出现的问题,因此这超出了resources选项的控制范围。

我想我已经找到了这个案子的罪魁祸首。问题是,dask自动尝试使用尽可能多的可用内核。

from dask.distributed import Client
with Client(n_workers=2, memory_limit='300Mb') as client:
print(client)

产生

<Client: 'tcp://127.0.0.1:39337' processes=2 threads=48, memory=600.00 MB>

当我尝试读取拼花地板文件时,dask会使用所有48个可用的内核,立即耗尽内存。

这里的技巧是限制每个工作线程的线程数:

with Client(n_workers=2, memory_limit='300Mb', threads_per_worker=1) as client:
print(client)

产生

<Client: 'tcp://127.0.0.1:34421' processes=2 threads=2, memory=600.00 MB>

然后计算在任何时间点使用每个工作者大约200-250MB而没有任何问题地进行。

相关问题

  • 带有线程的dask.distributed LocalCluster与进程之间的差异

最新更新