当使用Python的多处理Pool.map()
时,我没有找回我的记忆。 超过1GB的内存仍然被占用,虽然Pool
函数退出,一切都关闭了,我什至尝试删除Pool
的变量并显式调用垃圾收集器。
在下面显示的代码中,取消注释pool.map()
上方的两行(并注释pool.map()
行(时,一切看起来都正常,但是一旦使用multiprocessing
,内存似乎就不会在离开函数后再次释放。
因为在现实世界的代码中,调用了几个使用multiprocessing
的其他函数,这甚至会堆叠起来,消耗所有内存。
(不幸的是,我无法为次要的第二种情况提供最小示例,堆叠内存,但是一旦主要问题得到解决,第二个问题也应该消失。
这是Linux上的Python 3.7.3,非常欢迎任何至少解释甚至解决此问题的帮助。
最少的示例代码:
import gc
from time import sleep
from memory_profiler import profile
import numpy as np
def waitat(where, t):
# print and wait, gives chance to see live memory usage in some task manager program
print(where)
sleep(t)
@profile
def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
from skimage.color import rgb2hsv
import multiprocessing as mp
print("going parallel")
pool = mp.Pool()
try:
# images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
# for img in imgs:
# images_converted.append(rgb2hsv(img))
images_converted = pool.map(rgb2hsv, imgs)
except KeyboardInterrupt:
pool.terminate()
waitat("after pool.map",5)
pool.close()
pool.join()
waitat("before del pool",5)
pool = None
del pool # memory should now be freed here?
mp = None
rgb2hsv = None
waitat("after del pool",5)
print("copying over")
res = np.array(images_converted)
waitat("before del image_hsv in function",5)
images_converted = None
del images_converted
return res
@profile
def doit():
print("create random images")
max_images = 700
images = np.random.rand(max_images, 300, 300,3)
waitat("before going parallel",5)
images_converted = parallel_convert_all_to_hsv(images)
print("images_converted has %i bytes" % images_converted.nbytes)
# how to clean up Pool's memory at latest here?
waitat("before deleting original images",5)
images = None
del images
waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
images_converted = None
del images_converted
waitat("nearly end, memory should be as before" ,15)
gc.collect(2)
waitat("end, memory should be as before" ,15)
doit()
使用内存性能分析器的输出,显示问题:
$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
11 1481.2 MiB 1481.2 MiB @profile
12 def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
13 1487.2 MiB 6.0 MiB from skimage.color import rgb2hsv
14 1487.2 MiB 0.0 MiB import multiprocessing as mp
15 1487.2 MiB 0.0 MiB print("going parallel")
16 1488.6 MiB 1.4 MiB pool = mp.Pool()
17 1488.6 MiB 0.0 MiB try:
18 # images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
19 # for img in imgs:
20 # images_converted.append(rgb2hsv(img))
21 2930.9 MiB 1442.3 MiB images_converted = pool.map(rgb2hsv, imgs)
22 except KeyboardInterrupt:
23 pool.terminate()
24 2930.9 MiB 0.0 MiB waitat("after pool.map",5)
25
26 2930.9 MiB 0.0 MiB pool.close()
27 2931.0 MiB 0.1 MiB pool.join()
28
29 2931.0 MiB 0.0 MiB waitat("before del pool",5)
30 2931.0 MiB 0.0 MiB pool = None
31 2931.0 MiB 0.0 MiB del pool # memory should now be freed here?
32 2931.0 MiB 0.0 MiB mp = None
33 2931.0 MiB 0.0 MiB rgb2hsv = None
34
35 2931.0 MiB 0.0 MiB waitat("after del pool",5)
36 2931.0 MiB 0.0 MiB print("copying over")
37 4373.0 MiB 1441.9 MiB res = np.array(images_converted)
38 4373.0 MiB 0.0 MiB waitat("before del image_hsv in function",5)
39 4016.6 MiB 0.0 MiB images_converted = None
40 4016.6 MiB 0.0 MiB del images_converted
41 4016.6 MiB 0.0 MiB return res
images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
43 39.1 MiB 39.1 MiB @profile
44 def doit():
45 39.1 MiB 0.0 MiB print("create random images")
46 39.1 MiB 0.0 MiB max_images = 700
47 1481.2 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
48
49 1481.2 MiB 0.0 MiB waitat("before going parallel",5)
50 4016.6 MiB 2535.4 MiB images_converted = parallel_convert_all_to_hsv(images)
51 4016.6 MiB 0.0 MiB print("images_converted has %i bytes" % images_converted.nbytes)
52 # how to clean up Pool's memory at latest here?
53
54 4016.6 MiB 0.0 MiB waitat("before deleting original images",5)
55 2574.6 MiB 0.0 MiB images = None
56 2574.6 MiB 0.0 MiB del images
57 2574.6 MiB 0.0 MiB waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
58 1132.7 MiB 0.0 MiB images_converted = None
59 1132.7 MiB 0.0 MiB del images_converted
60 1132.7 MiB 0.0 MiB waitat("nearly end, memory should be as before" ,15)
61 1132.7 MiB 0.0 MiB gc.collect(2)
62 1132.7 MiB 0.0 MiB waitat("end, memory should be as before" ,15)
非并行代码的输出(未出现问题(:
$ python3 -m memory_profiler pool-mem-probs.py
create random images
before going parallel
going parallel
after pool.map
before del pool
after del pool
copying over
before del image_hsv in function
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
11 1481.3 MiB 1481.3 MiB @profile
12 def parallel_convert_all_to_hsv(imgs: np.ndarray) -> np.ndarray:
13 1488.1 MiB 6.8 MiB from skimage.color import rgb2hsv
14 1488.1 MiB 0.0 MiB import multiprocessing as mp
15 1488.1 MiB 0.0 MiB print("going parallel")
16 1488.7 MiB 0.6 MiB pool = mp.Pool()
17 1488.7 MiB 0.0 MiB try:
18 1488.7 MiB 0.0 MiB images_converted = [] # there is no memory problem when using commented lines below, instead of pool.map(…) line
19 2932.6 MiB 0.0 MiB for img in imgs:
20 2932.6 MiB 2.2 MiB images_converted.append(rgb2hsv(img))
21 # images_converted = pool.map(rgb2hsv, imgs)
22 except KeyboardInterrupt:
23 pool.terminate()
24 2932.6 MiB 0.0 MiB waitat("after pool.map",5)
25
26 2932.6 MiB 0.0 MiB pool.close()
27 2932.8 MiB 0.2 MiB pool.join()
28
29 2932.8 MiB 0.0 MiB waitat("before del pool",5)
30 2932.8 MiB 0.0 MiB pool = None
31 2932.8 MiB 0.0 MiB del pool # memory should now be freed here?
32 2932.8 MiB 0.0 MiB mp = None
33 2932.8 MiB 0.0 MiB rgb2hsv = None
34
35 2932.8 MiB 0.0 MiB waitat("after del pool",5)
36 2932.8 MiB 0.0 MiB print("copying over")
37 4373.3 MiB 1440.5 MiB res = np.array(images_converted)
38 4373.3 MiB 0.0 MiB waitat("before del image_hsv in function",5)
39 2929.6 MiB 0.0 MiB images_converted = None
40 2929.6 MiB 0.0 MiB del images_converted
41 2929.6 MiB 0.0 MiB return res
images_converted has 1512000000 bytes
before deleting original images
memory should be as before going parallel + 1512000000 bytes
nearly end, memory should be as before
end, memory should be as before
Filename: pool-mem-probs.py
Line # Mem usage Increment Line Contents
================================================
43 39.2 MiB 39.2 MiB @profile
44 def doit():
45 39.2 MiB 0.0 MiB print("create random images")
46 39.2 MiB 0.0 MiB max_images = 700
47 1481.3 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
48
49 1481.3 MiB 0.0 MiB waitat("before going parallel",5)
50 2929.6 MiB 1448.3 MiB images_converted = parallel_convert_all_to_hsv(images)
51 2929.6 MiB 0.0 MiB print("images_converted has %i bytes" % images_converted.nbytes)
52 # how to clean up Pool's memory at latest here?
53
54 2929.6 MiB 0.0 MiB waitat("before deleting original images",5)
55 1487.7 MiB 0.0 MiB images = None
56 1487.7 MiB 0.0 MiB del images
57 1487.7 MiB 0.0 MiB waitat("memory should be as before going parallel + %i bytes" % images_converted.nbytes ,10)
58 45.7 MiB 0.0 MiB images_converted = None
59 45.7 MiB 0.0 MiB del images_converted
60 45.7 MiB 0.0 MiB waitat("nearly end, memory should be as before" ,15)
61 45.7 MiB 0.0 MiB gc.collect(2)
62 45.7 MiB 0.0 MiB waitat("end, memory should be as before" ,15)
生成阈值可能会妨碍,看看 gc.get_threshold((
尝试包括
gc.disable()
确实存在泄漏问题,但对于某些神奇的参数,它不会出现。我无法理解它,但我们可以通过将列表传递给 pool.map 而不是 ndarray 来减少泄漏。images_converted = pool.map(rgb2hsv, [i for i in imgs])
这始终减少了我的测试中的内存泄漏。
旧答案:
池中似乎没有问题。你不应该指望第 31 行的 "del pool" 会释放你的内存,因为占用它的是变量 "imgs" 和 "images_converted"。这些在函数"parallel_convert_all_to_hsv"的范围内,而不是在"rgb2hsv"的范围内,因此"del pool"与它们无关。
删除第 56 行和第 59 行中的"图像"和"images_converted"后,内存被更正。
由于multithreading.Pool
无法释放大约 1* GB 的内存,我也尝试用ThreadPool
替换它,但没有更好。我仍然想知道池中的内存泄漏问题。
这可能不是最佳解决方案,但可以成为变通解决方案。
不使用ThreadPool
或ProcessPool
,我正在手动创建线程或进程,并为每个线程或进程分配要转换为HSV的图像。好吧,我已经p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
评论了这一行,因为它会为每个图像转换产生新的过程,我认为这将比线程矫枉过正且昂贵得多。显然,手动创建线程将比ThreadPool
(4 秒但有内存泄漏(花费更多的时间(9 秒,没有内存泄漏(,但正如您所看到的,它在内存上几乎保持平静。
这是我的代码:
import multiprocessing
import os
import threading
import time
from memory_profiler import profile
import numpy as np
from skimage.color import rgb2hsv
def do_hsv(img, shared_list):
shared_list.append(rgb2hsv(img))
# print("Converted by process {} having parent process {}".format(os.getpid(), os.getppid()))
@profile
def parallel_convert_all_to_hsv(imgs, shared_list):
cores = os.cpu_count()
starttime = time.time()
for i in range(0, len(imgs), cores):
# print("i :", i)
jobs = []; pipes = []
end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1
# print("end :", end)
for j in range(i, end):
# print("j :", j)
# p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))
jobs.append(p)
for p in jobs: p.start()
for proc in jobs:
proc.join()
print("Took {} seconds to complete ".format(starttime - time.time()))
return 1
@profile
def doit():
print("create random images")
max_images = 700
images = np.random.rand(max_images, 300, 300,3)
# images = [x for x in range(0, 10000)]
manager = multiprocessing.Manager()
shared_list = manager.list()
parallel_convert_all_to_hsv(images, shared_list)
del images
del shared_list
print()
doit()
这是输出:
create random images
Took -9.085552453994751 seconds to complete
Filename: MemoryNotFreed.py
Line # Mem usage Increment Line Contents
================================================
15 1549.1 MiB 1549.1 MiB @profile
16 def parallel_convert_all_to_hsv(imgs, shared_list):
17
18 1549.1 MiB 0.0 MiB cores = os.cpu_count()
19
20 1549.1 MiB 0.0 MiB starttime = time.time()
21
22 1566.4 MiB 0.0 MiB for i in range(0, len(imgs), cores):
23
24 # print("i :", i)
25
26 1566.4 MiB 0.0 MiB jobs = []; pipes = []
27
28 1566.4 MiB 0.0 MiB end = i + cores if (i + cores) <= len(imgs) else i + len(imgs[i : -1]) + 1
29
30 # print("end :", end)
31
32 1566.4 MiB 0.0 MiB for j in range(i, end):
33 # print("j :", j)
34
35 # p = multiprocessing.Process(target=do_hsv, args=(imgs[j], shared_list))
36 1566.4 MiB 0.0 MiB p = threading.Thread(target= do_hsv, args=(imgs[j], shared_list))
37
38 1566.4 MiB 0.0 MiB jobs.append(p)
39
40 1566.4 MiB 0.8 MiB for p in jobs: p.start()
41
42 1574.9 MiB 1.0 MiB for proc in jobs:
43 1574.9 MiB 13.5 MiB proc.join()
44
45 1563.5 MiB 0.0 MiB print("Took {} seconds to complete ".format(starttime - time.time()))
46 1563.5 MiB 0.0 MiB return 1
Filename: MemoryNotFreed.py
Line # Mem usage Increment Line Contents
================================================
48 106.6 MiB 106.6 MiB @profile
49 def doit():
50
51 106.6 MiB 0.0 MiB print("create random images")
52
53 106.6 MiB 0.0 MiB max_images = 700
54
55 1548.7 MiB 1442.1 MiB images = np.random.rand(max_images, 300, 300,3)
56
57 # images = [x for x in range(0, 10000)]
58 1549.0 MiB 0.3 MiB manager = multiprocessing.Manager()
59 1549.1 MiB 0.0 MiB shared_list = manager.list()
60
61 1563.5 MiB 14.5 MiB parallel_convert_all_to_hsv(images, shared_list)
62
63 121.6 MiB 0.0 MiB del images
64
65 121.6 MiB 0.0 MiB del shared_list
66
67 121.6 MiB 0.0 MiB print()