我正在尝试使用Python中的多处理模块对代码进行并行化,以找到相似性矩阵。当我使用带有10 X 15元素的小np.ndarray时,它工作得很好。但是,当我将np.ndarray扩展到3613 X 7040元素时,系统内存不足。
下面是我的代码。
import multiprocessing
from multiprocessing import Pool
## Importing Jacard_similarity_score
from sklearn.metrics import jaccard_similarity_score
# Function for finding the similarities between two np arrays
def similarityMetric(a,b):
return (jaccard_similarity_score(a,b))
## Below functions are used for Parallelizing the scripts
# auxiliary funciton to make it work
def product_helper1(args):
return (similarityMetric(*args))
def parallel_product1(list_a, list_b):
# spark given number of processes
p = Pool(8)
# set each matching item into a tuple
job_args = getArguments(list_a,list_b)
# map to pool
results = p.map(product_helper1, job_args)
p.close()
p.join()
return (results)
## getArguments function is used to get the combined list
def getArguments(list_a,list_b):
arguments = []
for i in list_a:
for j in list_b:
item = (i,j)
arguments.append(item)
return (arguments)
现在,当我运行下面的代码时,系统内存不足,挂起了。我正在通过两个numpy.ndarray testMatrix1和testMatrix2,它们的大小为(36137040)
resultantMatrix = parallel_product1(testMatrix1,testMatrix2)
我刚开始在Python中使用这个模块,并试图理解我哪里出了问题。感谢您的帮助。
很可能,问题只是组合爆炸。你试图提前实现主进程中的所有对,而不是实时生成它们,所以你存储了大量的内存。假设ndarray
包含double
值,这些值变为Python float
,则getArguments
返回的list
的内存使用量大致为每对一个tuple
和两个float
的成本,或者大约为:
3613 * 7040 * (sys.getsizeof((0., 0.)) + sys.getsizeof(0.) * 2)
在我的64位Linux系统上,这意味着在工作人员做任何事情之前,Py3上大约有2.65 GB的RAM,Py2上大约有2.85 GB。
如果您可以使用生成器以流式方式处理数据,因此参数会延迟生成,并在不再需要时丢弃,那么您可能会显著减少内存使用:
import itertools
def parallel_product1(list_a, list_b):
# spark given number of processes
p = Pool(8)
# set each matching item into a tuple
# Returns a generator that lazily produces the tuples
job_args = itertools.product(list_a,list_b)
# map to pool
results = p.map(product_helper1, job_args)
p.close()
p.join()
return (results)
这仍然需要将所有结果放入内存;如果product_helper
返回float
s,那么64位机器上result
list
的预期内存使用量仍将在0.75GB左右,这是相当大的;如果你可以以流的方式处理结果,迭代p.imap
甚至更好的p.imap_unordered
的结果(后者返回计算的结果,而不是生成器生成参数的顺序),并将它们写入磁盘,或者以其他方式确保它们在内存中快速释放,将节省大量内存;下面只是打印出来,但以某种可重新生成的格式将它们写入文件也是合理的。
def parallel_product1(list_a, list_b):
# spark given number of processes
p = Pool(8)
# set each matching item into a tuple
# Returns a generator that lazily produces the tuples
job_args = itertools.product(list_a,list_b)
# map to pool
for result in p.imap_unordered(product_helper1, job_args):
print(result)
p.close()
p.join()
map
方法通过进程间通信将所有数据发送给工作者。正如目前所做的那样,这消耗了大量的资源,因为你正在发送
我建议它修改getArguments
,以列出矩阵中需要组合的索引的元组。这只是必须发送到工作进程的两个数字,而不是矩阵的两行!然后,每个工作者都知道要使用矩阵中的哪些行。
在调用map
之前,加载这两个矩阵并将它们存储在全局变量中。这样每个工作人员都可以访问它们。只要它们没有在worker中被修改,操作系统的虚拟内存管理器就不会复制相同的内存页,从而降低内存使用率。