如何直接汇总python多处理获得的结果,而不返回所有单独的结果以节省内存



我有一个函数可以创建一个大掩码(布尔数组(。我想多次调用此函数,并创建一个形状相同的总掩码,该掩码在任何单个掩码中的索引为True时都为True。

由于掩码的计算需要很长时间,我已经将其并行化,但该函数现在消耗了大量内存,因为我首先创建所有单独的掩码,然后将其组合,这意味着我必须存储大约40.000个单独的掩码。在使用多处理计算下一个掩码之前,是否有可能将返回的单个掩码直接添加到总掩码中?

这是问题的示例代码:

import numpy as np
from multiprocessing import Pool

def return_something(seed):
np.random.seed(seed)
return np.random.choice([True, False], size=shape, p=[0.1, 0.9])

shape = (50, 50)
ncores = 4
seeds = np.random.randint(low=0, high=np.iinfo(np.int32).max, size=10)
# Without parallelisation, very slow:
mask = np.zeros(shape, dtype=bool)
for seed in seeds:
mask |= return_something(seed)

# With parallelisation, takes too much memory
p = Pool(ncores)
mask_parallel = np.any(list(p.imap(return_something, seeds)), axis=0)

我想我对(I(map函数理解不够。我知道multiprocessing.map返回一个生成器,例如可以使用tqdm显示进度条,代码如下:

list(tqdm.tqdm(p.imap(fct, inputs), total=len(inputs))

由于进度条是在多处理运行过程中更新的,我认为在运行过程中一定可以访问结果,并可能对其进行汇总,但我不知道如何进行。

谢谢你的帮助!

遍历种子是没有意义的,因为您在return_somethign中创建了一个非常大的数组。因此,您必须将这个数组创建划分为一些子创建,并在这些子创建中迭代。Pool.map()方法返回每次迭代中执行的函数的结果列表。向您展示您的案例的一般实现。我所做的只是将每一行的创建并行化,并通过map()函数将它们放在一起。

import numpy as np
import multiprocessing as mp
def return_something(i):
mask = np.random.choice([True, False], size=(shape[0],), p=[0.1, 0.9])
return mask
shape = (5000, 5000)
if __name__ == "__main__":
pool = mp.Pool(mp.cpu_count())
results = pool.map(return_something, [i for i in range(shape[1])])
pool.close()
print(len(results))

关于你的评论,我展示了一种方法,一旦计算(动态(,就可以将结果项附加到列表中

import numpy as np
from multiprocessing import Pool
import time
def return_something(seed):
np.random.seed(seed)
return np.random.choice([True, False], size=shape, p=[0.1, 0.9])

shape = (50, 50)
ncores = 4
seeds = np.random.randint(low=0, high=np.iinfo(np.int32).max, size=100000)
mask = []
if __name__ == "__main__":
p = Pool(12)
start = time.time()
for res in p.imap(return_something, seeds, chunksize=1):
mask.append(res)
print("{} (Time elapsed: {}s)".format(len(res), time.time() - start))
p.close()
print(len(mask))

最新更新