多处理 -- 线程池内存泄漏



我正在观察内存使用情况,我无法向自己解释。下面我提供了实际代码的精简版本,该代码仍然表现出此行为。该代码旨在完成以下操作:

读取 1000 行块的文本文件。每行都是一个句子。将这 1000 个句子分成 4 个生成器。将这些生成器传递到线程池,并对 250 个句子并行运行特征提取。 在我的实际代码中,我从整个文件的所有句子中积累特征和标签。 现在奇怪的事情来了:内存被分配,但即使没有积累这些值也不会再次释放!我认为这与线程池有关。总占用的内存量取决于为任何给定单词提取的特征数量。我在这里用range(100)模拟了这一点.看一看:

from sys import argv
from itertools import chain, islice
from multiprocessing import Pool
from math import ceil

# dummyfied feature extraction function
# the lengt of the range determines howmuch mamory is used up in total,
# eventhough the objects are never stored
def features_from_sentence(sentence):
return [{'some feature'  'some value'} for i in range(100)], ['some label' for i in range(100)]

# split iterable into generator of generators of length `size`
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, size - 1))

def features_from_sentence_meta(l):
return list(map (features_from_sentence, l))

def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# split sentences into a generator of 4 generators
sentence_chunks = chunks(sentences, ceil(50000/4))
# results is a list containing the lists of pairs of X and Y of all chunks
results = map(lambda x : x[0], pool.map(features_from_sentence_meta, sentence_chunks))
X, Y = zip(*results)
print(f'end: {i}')
return X, Y

# reads file in chunks of `lines_per_chunk` lines
def line_chunks(textfile, lines_per_chunk=1000):
chunk = []
i = 0
with open(textfile, 'r') as textfile:
for line in textfile:
if not line.split(): continue
i+=1
chunk.append(line.strip())
if i == lines_per_chunk:
yield chunk
i = 0
chunk = []
yield chunk
textfile = argv[1]
for i, line_chunk in enumerate(line_chunks(textfile)):
# stop processing file after 10 chunks to demonstrate
# that memory stays occupied (check your system monitor)
if i == 10:
while True:
pass
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

我用来调试它的文件有 50000 个非空行,这就是我在一个地方使用硬编码 50000 的原因。如果您想使用相同的文件,他是一个方便您的链接:

https://www.dropbox.com/s/v7nxb7vrrjim349/de_wiki_50000_lines?dl=0

现在,当您运行此脚本并打开系统监视器时,您将观察到内存已用完,并且使用量一直持续到第 10 个块,在那里我人为地进入一个无限循环以证明内存仍在使用中,即使我从不存储任何内容。

你能向我解释为什么会这样吗?我似乎缺少一些关于应该如何使用多处理池的东西。

首先,让我们澄清一些误解——尽管事实证明,这实际上并不是首先探索的正确途径。

当你在Python中分配内存时,它当然必须从操作系统中获取内存。

但是,当您释放内存时,它很少会返回到操作系统,直到您最终退出。相反,它进入了一个"免费列表"——或者,实际上是出于不同目的的多个级别的免费列表。这意味着下次你需要内存时,Python 已经有内存了,可以立即找到它,而无需与操作系统对话来分配更多内存。这通常会使内存密集型程序更快。

但这也意味着 - 特别是在现代64位操作系统上 - 试图通过查看活动监视器/任务管理器/等来了解您是否确实存在任何内存压力问题几乎是无用的。


标准库中的tracemalloc模块提供了低级工具来查看内存使用情况的实际情况。在更高级别,您可以使用类似memory_profiler的东西(如果您启用tracemalloc支持 - 这很重要)可以将该信息与来自psutil等来源的操作系统级信息放在一起,以确定事情的发展方向。

但是,如果你没有看到任何实际问题——你的系统没有进入交换地狱,你没有得到任何MemoryError异常,你的性能没有遇到一些奇怪的悬崖,它线性地扩展到 N,然后突然在 N+1 时陷入地狱,等等——你通常不需要首先为这些而烦恼。


如果你确实发现了一个问题,那么幸运的是,你已经解决了它的一半。正如我在顶部提到的,在您最终退出之前,您分配的大多数内存都不会返回到操作系统。但是,如果所有内存使用都发生在子进程中,并且这些子进程没有状态,则可以随时使它们退出并重新启动。

当然,这样做会产生性能成本 - 进程拆卸和启动时间,以及必须重新开始的页面映射和缓存,并要求操作系统再次分配内存,等等。而且还有复杂性成本——你不能只运行一个池,让它做它的事情;你必须参与它的事情,让它为你回收过程。

multiprocessing.Pool类中没有用于执行此操作的内置支持。

当然,您可以建立自己的Pool.如果你想花哨,你可以查看源头multiprocessing并做它所做的事情。或者,您可以从Process对象列表和一对Queue中构建一个普通池。或者,您可以直接使用Process对象,而无需抽象池。


您可能遇到内存问题的另一个原因是您的个人进程很好,但您有太多的内存问题。

事实上,这里似乎就是这种情况。

在此函数中创建 4 个工作人员的Pool

def make_X_and_Y_sets(sentences, i):
print(f'start: {i}')
pool = Pool()
# ...

。你为每个块调用这个函数:

for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

因此,您最终会为每个块提供 4 个新流程。即使每个内存使用率都很低,一次拥有数百个内存也会加起来。

更不用说您可能会因为数百个进程在 4 个内核上竞争而严重损害您的时间性能,因此您将时间浪费在上下文切换和操作系统调度上,而不是做实际工作。

正如您在评论中指出的那样,解决此问题的方法很简单:只需为每个调用创建一个全局pool而不是一个新。


很抱歉把所有的哥伦布都弄到这里来了,但是...还有一件事...此代码在模块的顶层运行:

for i, line_chunk in enumerate(line_chunks(textfile)):
# ...
X_chunk, Y_chunk = make_X_and_Y_sets(line_chunk, i)

。这就是尝试启动池和所有子任务的代码。但是该池中的每个子进程都需要import此模块,这意味着它们最终都将运行相同的代码,并启动另一个池和一整套额外的子任务。

您可能是在Linux或macOS上运行它,其中默认startmethodfork,这意味着multiprocessing可以避免这种import,所以你没有问题。但是对于其他启动方法,这段代码基本上是一个叉弹,会消耗掉你所有的系统资源。这包括spawn,这是 Windows 上的默认启动方法。因此,如果有人有可能在Windows上运行此代码,则应将所有顶级代码放在if __name__ == '__main__':保护中。

相关内容

  • 没有找到相关文章

最新更新