Python Multiprocessing pool.map 无响应,工作进程过多



关于堆栈溢出的第一个问题,所以请耐心等待。我正在寻找计算组评级(长 numpy 数组(的方差。在没有并行处理的情况下运行程序工作正常,但鉴于每个进程都可以独立运行,并且有 32 个组,我希望利用多处理来加快速度。这对于 10

import numpy as np
from functools import partial
import multiprocessing
def variance_parallel(extra_matrices, group_num):
# do some variation calculation
# print confirmation that we have entered function, and group number
return single_group_var
def variance(extra_matrices, num_groups):
variance_partial = partial(variance_parallel, extra_matrices)
for g in list(range(num_groups)):
group_var = pool.map(variance_partial,range(g))
return(group_var)     
num_cores = multiprocessing.cpu_count() - 1
pool = multiprocessing.Pool(processes=num_cores)
variance(extra_matrices, num_groups)

运行上面的代码显示程序在最终不打印任何内容之前逐步构建它正在检查方差的组数 ([0],[0,1],[0,1,2],...(。

提前感谢任何帮助,如果我的格式/问题有点不对劲,请道歉!

  • 多个进程不共享数据
  • 发送到进程的数据需要复制

由于数组很大,因此问题很可能与将大型数组复制到进程中有关。此外,在Python的多处理中,将数据发送到进程是通过序列化完成的,序列化是(a(CPU密集型的,(b(需要额外的内存。

简而言之,多处理并不适合您的用例。由于 numpy 是本机代码扩展(GIL 不适用(并且是线程安全的,因此最好使用线程而不是多处理。使用线程,工作线程可以通过其父进程的地址空间共享数据,从而不必复制。

这应该会阻止程序内存不足。

但是,要使线程共享地址空间,它们共享的数据需要绑定到对象,就像在 python 类中一样。

如下所示 - 未经测试,因为代码示例不完整。

import numpy as np
from functools import partial
from threading import Thread
from multiprocessing import cpu_count
class Variance(Thread):
def __init__(self, extra_matrices, group_num):
Thread.__init__(self)
self.extra_matrices = extra_matrices
self.group_num = group_num
self.output = None
def run(self):
# do some variation calculation
# print confirmation that we have entered function, and group number
self.output = single_group_var
num_cores = cpu_count() - 1
results = []
for g in list(range(num_groups)):
workers = [Variance(extra_matrices, range(g)) 
for _ in range(num_cores)]
# Start threads
for worker in workers:
worker.start()
# Wait for completion
for worker in workers:
worker.join()
results.extend([w.output for w in workers])
print results

相关内容

  • 没有找到相关文章

最新更新