使用dask.bag.map时内存分配高



我使用dask通过先前计算的外部对象arg的信息扩展任务包项。Dask似乎在计算过程开始时为每个分区一次性分配arg的内存。

是否有一个解决方案,以防止Dask从复制arg多次(并分配大量的内存)?

下面是一个简化的例子:

from pathlib import Path
import numpy as np
import pandas as pd
from dask import bag
in_dir = Path.home() / 'in_dir'
out_dir = Path.home() / 'out_dir'
in_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)
n_files = 100
n_lines_per_file = int(1e6)
df = pd.DataFrame({
'a': np.arange(n_lines_per_file).astype(str)
})
for i in range(n_files):
df.to_csv(in_dir / f'{i}.txt', index=False, header=False)

def mapper(x, arg):
y = x  # map x to y using arg
return y

arg = np.zeros(int(1e7))
(
bag
.read_text(str(in_dir / '*.txt'))
.map((lambda x, y: x), arg)
.to_textfiles(str(out_dir / '*.txt'))
)

处理这个问题的一种策略是首先将您的数据scatter给worker:

import dask.bag, dask.distributed
client = dask.distributed.Client()
arg = np.zeros(int(1e7))
arg_f = client.scatter(arg, broadcast=True)
(
dask.bag
.read_text(str(in_dir / '*.txt'))
.map((lambda x, y: x), arg_f)
.to_textfiles(str(out_dir / '*.txt'))
)

将数据的副本发送给每个worker,但不会为每个task创建副本。

最新更新