Python的多处理共享内存以内存损坏告终



我正试图使用队列将SharedMemory引用传递给已经在运行的进程。问题是,一旦我在另一个进程上接收(或获取)SharedMemory对象,相应的内存块似乎根本不匹配,甚至大小太大。

import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory

def f(q):
shared_memory = q.get()
print(f"In Process: {shared_memory=}")
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)
print(f"In Process: {x=}")

if __name__ == '__main__':
temp_array = np.arange(8)
print(f"Main: {temp_array=}")
smh = SharedMemory(create=True, size=temp_array.nbytes)
print(f"Main: {smh=}")
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
print(f"Main: {fix_array=}")
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put(smh)

如果我运行这个代码,它会输出以下输出:

Main: temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
Main: smh=SharedMemory('wnsm_2202c81b', size=32)
Main: fix_array=array([0, 0, 0, 0, 0, 0, 0, 0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b', size=4096)
In Process: x=array([0., (weird very small numbers and many many zeros...), 0.])

我本想把原来的temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])取回

根据文档,内存大小可能不匹配。此外,我用一个包含1e6项的数组对其进行了测试,只传递SharedMemory的名称,并使用Pipe代替Queue,但仍然相同。

是我做错了什么,还是这是个bug?

(我使用的是Windows 10 Build 19043,Python 3.9.6 64位)

感谢@Timus

我认为最好将其分为两个问题来解决:

问题1,奇怪的数字:

如果你通过x=np.frombuffer(buffer=shared_memory.buf,dtype=np.int32)调整f的定义,你会得到你的数字(那是初始类型)。

正如@Timus所指出的,错误是数据类型不匹配:np.arange()返回带有dtype=np.int32np.ndarray,但我试图获得带有dtype=np.float64的数组,因此出现错误结果。

修复:

@Timus的解决方案或添加dtype=np.float64作为np.arange()的参数,使其读取:temp_array = np.arange(8, dtype=np.float)


问题2,数组过长:

根据Python文档,SharedMemory.size可能比原来更大。因此,数组的长度也可能有所不同。

修复/解决方法:

将数组修剪为其原始大小,例如使用numpy.resize()。为此,还需要将原始CCD_ 11传递给CCD_。虽然这对我来说很好,但以下一点对其他人来说可能是个问题:由于x只是缓冲区的一个视图,所以np.ndarray.resize()不可用(它不拥有自己的数据)。使用numpy.resize(),将进行复制,并且对调整大小的副本所做的更改不会反映在主进程中!为了适应这种情况,CCD_ 16的值可以被复制回CCD_。


固定代码现在看起来像这样:

import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def f(q):
shared_memory, shape = q.get()  # the shape is passed here
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)  # dtype matches
# x = np.trim_zeros(x, "b"), this doesn't work if there are zeros in the dataset
x_resized = np.resize(x, new_shape=shape)  # changes not reflected on main process
###
# make things to x_resized
###
x[:8] = x_resized[:] # copy changes back to x
if __name__ == '__main__':
temp_array = np.arange(8, dtype=np.float64) # dtype is correctly specified

smh = SharedMemory(create=True, size=temp_array.nbytes)
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put((smh, temp_array.shape)) # passing the original shape

奇怪的是,虽然第二个进程中的x太长,但在主进程中,fix_array仍然保持正确的大小。。。

最新更新