我正在尝试将List[np.ndarray]
加载到shared_memory
中,以便其他进程可以直接访问此shared_memory
并恢复原始List[np.ndarray]
,而无需将List[np.ndarray]
复制到每个进程中。详细的动机与我之前的问题有关:在多处理之间共享只读通用复杂python对象,并将int、numpy数组列表、元组等作为实例字段
我写了以下代码(python版本:3.8.12,Numpy:1.20.3,MacOS(:
encode_nd_arr_list()
:给定List[np.ndarray]
,我可以得到List of share_memory name
。decode_nd_arr_list()
:给定List of share_memory name
,我可以恢复原来的List[np.ndarray]
。
from typing import List, Tuple
import numpy as np
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
def encode_nd_arr_list(
smm: SharedMemoryManager,
nd_arr_list: List[np.ndarray]
):
shm_name_list = []
shape, dtype = nd_arr_list[0].shape, nd_arr_list[0].dtype
print(shape)
print(dtype)
for nd_arr in nd_arr_list:
shm = smm.SharedMemory(size=nd_arr.nbytes)
shm_arr = np.ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
np.copyto(shm_arr, nd_arr)
shm_name_list.append(shm.name)
return shm_name_list, shape, dtype
def decode_nd_arr_list(
shm_name_list: List[str],
shape: Tuple[int],
dtype: np.dtype
):
nd_array_list = []
for shm_name in shm_name_list:
print("----------")
shm = SharedMemory(shm_name)
nd_arr = np.ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
print("nd_arr:", nd_arr)
nd_array_list.append(nd_arr)
print("nd_array_list:", nd_array_list)
return nd_array_list
if __name__ == '__main__':
arr = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
nd_arr_list = [arr, arr + 1, arr + 2]
print(nd_arr_list)
with SharedMemoryManager() as smm:
shm_name_list, shape, dtype = encode_nd_arr_list(smm, nd_arr_list)
print(shm_name_list)
print(shape)
print(dtype)
res = decode_nd_arr_list(shm_name_list, shape, dtype)
print("------------")
print(res)
但是,当我在PyCharm
中运行它时,控制台显示Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)
。当我在终端中运行它时,它显示segmentation fault
,没有任何错误信息。
我的问题:
在我的情况下,这个错误意味着什么?
如何使我的代码正常工作?谢谢
decode_nd_arr_list
方法中循环的每次迭代中使用的缓冲区在相应的SharedMemory
对象超出范围并导致segfault后关闭。您实际上是在试图访问一个不再有效的内存。
为了修复它,您可以创建一个自定义对象,该对象环绕ndarray并存储SharedMemory
以防止它超出范围。
示例:
from typing import List, Tuple
import numpy as np
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
class SHMArray(np.ndarray):
def __new__(cls, input_array, shm=None):
obj = np.asarray(input_array).view(cls)
obj.shm = shm
return obj
def __array_finalize__(self, obj):
if obj is None: return
self.shm = getattr(obj, 'shm', None)
def encode_nd_arr_list(
smm: SharedMemoryManager,
nd_arr_list: List[np.ndarray]
):
shm_name_list = []
shape, dtype = nd_arr_list[0].shape, nd_arr_list[0].dtype
print(shape)
print(dtype)
for nd_arr in nd_arr_list:
shm = smm.SharedMemory(size=nd_arr.nbytes)
shm_arr = np.ndarray(shape=shape, dtype=dtype, buffer=shm.buf)
np.copyto(shm_arr, nd_arr)
shm_name_list.append(shm.name)
return shm_name_list, shape, dtype
def decode_nd_arr_list(
shm_name_list: List[str],
shape: Tuple[int],
dtype: np.dtype
):
nd_array_list = []
for shm_name in shm_name_list:
print("----------")
shm = SharedMemory(shm_name)
nd_arr = SHMArray(np.ndarray(shape=shape, dtype=dtype, buffer=shm.buf), shm)
print("nd_arr:", nd_arr)
nd_array_list.append(nd_arr)
print("nd_array_list:", nd_array_list)
return nd_array_list
if __name__ == '__main__':
arr = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])
nd_arr_list = [arr, arr + 1, arr + 2]
print(nd_arr_list)
with SharedMemoryManager() as smm:
shm_name_list, shape, dtype = encode_nd_arr_list(smm, nd_arr_list)
print(shm_name_list)
print(shape)
print(dtype)
res = decode_nd_arr_list(shm_name_list, shape, dtype)
print("------------")
print(res)
参考:https://github.com/numpy/numpy/issues/18294#issuecomment-771329575