使用Python多处理共享对象数组



对于这个问题,我参考Python文档中讨论";使用具有NumPy数组的SharedMemory类,从两个不同的Python shell访问相同的numpy.ndarray";。

我想实现的一个主要变化是操作类对象数组,而不是如下所示的整数值。

import numpy as np
from multiprocessing import shared_memory    
# a simplistic class example
class A(): 
    def __init__(self, x): 
        self.x = x
# numpy array of class objects 
a = np.array([A(1), A(2), A(3)])       
# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')
# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)                                    
# copy the original data into shared memory
b[:] = a[:]                                  
print(b)                                            
# array([<__main__.Foo object at 0x7fac56cd1190>,
#       <__main__.Foo object at 0x7fac56cd1970>,
#       <__main__.Foo object at 0x7fac56cd19a0>], dtype=object)

现在,在另一个shell中,我们附加到共享内存空间,并尝试操作数组的内容。

import numpy as np
from multiprocessing import shared_memory
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype=object, buffer=existing_shm.buf)

甚至在我们能够操作c之前,打印它就会导致分割错误。事实上,我不能期望观察到尚未写入模块的行为,所以我的问题是我能做些什么来处理共享的对象数组?

我目前正在浏览这个列表,但受保护的读/写会增加一些开销。我还尝试过使用Namespace,它非常慢,因为不允许使用索引写入。另一个想法可能是在ShareableList中使用share Ctypes Structure,但我不知道从哪里开始。

此外,还有一个设计方面:shared_memory中似乎有一个开放的bug,可能会影响我的实现,其中我有几个进程在处理数组的不同元素。

是否有一种更具可扩展性的方法可以在多个进程之间共享一个大型对象列表,以便在任何给定时间,所有正在运行的进程都与列表中的唯一对象/元素进行交互?

UPDATE:在这一点上,我还将接受部分答案,讨论是否可以使用Python实现这一点。

因此,我做了一些研究(多处理器中的共享内存对象(,并提出了一些想法:

传递numpy字节数组

序列化对象,然后将它们作为字节字符串保存到numpy数组中。这里的问题是

  1. 需要将数据类型从'psm_test0'的创建者传递给'psm_test0'的任何使用者。不过,这可以通过另一个共享内存来完成。

  2. pickleunpickle本质上类似于deepcopy,即它实际上复制底层数据。

"主"进程的代码为:

import pickle
from multiprocessing import shared_memory
import numpy as np

# a simplistic class example
class A():
    def __init__(self, x):
        self.x = x
    def pickle(self):
        return pickle.dumps(self)
    @classmethod
    def unpickle(self, bts):
        return pickle.loads(bts)

if __name__ == '__main__':
    # Test pickling procedure
    a = A(1)
    print(A.unpickle(a.pickle()).x)
    # >>> 1
    # numpy array of byte strings
    a_arr = np.array([A(1).pickle(), A(2).pickle(), A('This is a really long test string which should exceed 42 bytes').pickle()])
    # create a shared memory instance
    shm = shared_memory.SharedMemory(
        create=True,
        size=a_arr.nbytes,
        name='psm_test0'
    )
    # numpy array backed by shared memory
    b_arr = np.ndarray(a_arr.shape, dtype=a_arr.dtype, buffer=shm.buf)
    # copy the original data into shared memory
    b_arr[:] = a_arr[:]
    print(b_arr.dtype)
    # 'S105'

对于消费者

import numpy as np
from multiprocessing import shared_memory
from test import A
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype='S105', buffer=existing_shm.buf)
# Test data transfer
arr = [a.x for a in list(map(A.unpickle, c))]
print(arr)
# [1, 2, ...]

我想说你有几种前进的方式:

  1. 使用简单的数据类型。

  2. 使用C api实现一些东西,但我在这方面真的帮不了你。

  3. 使用Rust

  4. 使用管理器。您可能会失去一些性能(不过我希望看到一个真正的基准测试(,但您可以为共享对象获得一个相对安全和简单的接口。

  5. 使用Redis,它也有Python绑定。。。

相关内容

  • 没有找到相关文章

最新更新