有可能为多处理阵列创建一个持久的NumPy ndarray接口吗



我需要在多个进程之间共享公共numpy.ndarray。如果数据被打包为multiprocessing.Array,并在访问数组之前转换为ndarray,那么它就会工作。如果我想在ndarray中包装数据一次并重用(注释掉的行而不是用# w标记的行(,这是行不通的。

该示例适用于Python 3.10.4@Win10(下面的输出(,但如果我注释掉# w行并取消注释所有其他行,则共享数组不会更新,并保持为[0 0 0 0](所有"LOG"输出行显示[0 0 0 0](。

问题:

  • 为什么下面构造函数中包装multiprocessing.Array对象的ndarray成员在调用_update_array方法时无效
from datetime import datetime
from multiprocessing import Process, Pipe, Manager, Array
import numpy as np
import os
import time
curr_time = lambda: datetime.now().strftime("%H:%M:%S-%f")
DATA_TYPE = "i"
class WorkerInterface:
def __init__(self, data_array, worker_id):
#self._data_lock = data_array.get_lock()
#with self._data_lock:
#    self._data_array = np.ndarray(
#        (len(data_array),),
#        dtype="DATA_TYPE,
#        buffer=data_array.get_obj()
#    )
self._data_array = data_array  # w
self.worker_id = worker_id
self._keep_running = True
def _data_as_numpy_array(self):  # w
return np.ndarray(
(len(self._data_array),),
dtype=DATA_TYPE,
buffer=self._data_array.get_obj()
)
@property
def data_array(self):
#with self._data_lock:
#    return self._data_array.copy()
with self._data_array.get_lock():  # w
return self._data_as_numpy_array().copy()  # w
def _update_array(self):
#with self._data_lock:
#    self._data_array[self.worker_id] += 1
with self._data_array:  # w
self._data_as_numpy_array()[self.worker_id] += 1  # w
def __call__(self, conn):
while self._keep_running:
msg, wait = conn.recv().split(";")
wait = int(wait)
time.sleep(wait)
print(f"W{self.worker_id}, PID {os.getpid()}", end="")
print(f" @ {curr_time()}>>> {msg} <{wait}s>")
self._update_array()
if msg == "stop":
self._keep_running = False
conn.close()
if __name__ == '__main__':

NUM_WORKERS = 4

with Manager() as manager:

array = np.zeros((NUM_WORKERS,), dtype=DATA_TYPE)
shared_array = Array(DATA_TYPE, array)

print("INIT LOG", np.array(shared_array))

workers = []
for i in range(NUM_WORKERS):
worker = WorkerInterface(shared_array, i)
main_conn, worker_conn = Pipe()
p = Process(target=worker, args=(worker_conn,))
p.start()
workers.append({"pipe": main_conn, "pid": p, "worker": worker})

print(f"- C0 @ {curr_time()}")
workers[0]["pipe"].send("m1;5")
print(f"- C1 @ {curr_time()}")
workers[1]["pipe"].send("m2;0")
print(f"- C2 @ {curr_time()}")
workers[2]["pipe"].send("m3;3")
print(f"- C3 @ {curr_time()}")
workers[0]["pipe"].send("m5;1")

for wi, w in enumerate(workers):
w["pipe"].send("stop;0")
print(f"- X{wi} @ {curr_time()}")
w["pid"].join()
print(f"- E{wi} @ {curr_time()}")
print("LOG:", wi, w["worker"].data_array)

print("FINAL LOG", np.array(shared_array))

输出:

INIT LOG [0 0 0 0]
- C0 @ 11:59:22-764692
- C1 @ 11:59:22-764692
- C2 @ 11:59:22-765694
- C3 @ 11:59:22-765694
- X0 @ 11:59:22-766693
- E0 @ 11:59:29-133874
LOG: 0 [3 1 1 0]
- X1 @ 11:59:29-133874
- E1 @ 11:59:29-153877
LOG: 1 [3 2 1 0]
- X2 @ 11:59:29-154877
- E2 @ 11:59:29-173877
LOG: 2 [3 2 2 0]
- X3 @ 11:59:29-174876
- E3 @ 11:59:29-195878
LOG: 3 [3 2 2 1]
FINAL LOG [3 2 2 1]
W0, PID 3056 @ 11:59:28-092878>>> m1 <5s>
W0, PID 3056 @ 11:59:29-106907>>> m5 <1s>
W0, PID 3056 @ 11:59:29-106907>>> stop <0s>
W1, PID 27116 @ 11:59:23-091694>>> m2 <0s>
W1, PID 27116 @ 11:59:29-133874>>> stop <0s>
W2, PID 19356 @ 11:59:26-095872>>> m3 <3s>
W2, PID 19356 @ 11:59:29-154877>>> stop <0s>
W3, PID 10000 @ 11:59:29-174876>>> stop <0s>

您的代码示例可能需要大量简化,这可能会帮助您发现错误,但这里有一个我定期使用的助手类,它使用multiprocessing.shared_memory(内部与Array非常相似(来共享numpy数组,并使其可拾取以发送到子进程和子进程之间(您仍然负责自己的锁(:

import numpy as np
from multiprocessing import shared_memory, Process
class Shared_Arr: #helper class to make shared_memory arrays easier
def __init__(self, shape, dtype, shm=None):
self.shape=shape
self.dtype=dtype
if shm is None:
n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
self.shm = shared_memory.SharedMemory(create=True, size=n_bytes)
self.owner = True
else:
self.shm = shm
self.owner = False
self.close = self.shm.close
self.unlink = self.shm.unlink
self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)
def __reduce__(self): #make it picklable so it can be sent to a child process correctly
return (self.__class__, (self.shape, self.dtype, self.shm))
def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close() #closes the memory-mapped file
if self.owner:
self.unlink() #tell the OS to delete the file
def populate_arr(shared, value):
with shared: #without the context manager you could just manually call shared.close() when you're done with it
shared.arr[:] = value
if __name__ == "__main__":
with Shared_Arr([10], int) as shared:
shared.arr[:] = 0 #np.ndarray may operate like np.empty? initialize to zero
print(shared.arr) #before modification
p = Process(target=populate_arr, args=(shared, 5))
p.start()
p.join()
print(shared.arr) #after being modified in a separate process

我的类旨在用作主线程中实例化的工作线程。然后它的循环函数(__call__(被转移到另一个进程。在这种情况下,实例(对象(被pickle并以这种形式发送到另一个进程。在那里,它被取消拾取,并运行副本的辅助循环函数。

如果共享数组被封装在构造函数中的NumPy数组中,那么底层共享数组在pickle和unpickle过程中会被破坏,因此无法正常工作。

重要的是,在处理循环开始时包装multiprocessing.Array的共享实例,或者通过重新定义__getstate____setstate__方法,在酸洗时处理正确的包装。

一个与共享Array:具有工作NumPy接口的类

class WorkerInterface:
def __init__(self, worker_id, data_array, array_shape = None):
self._raw_data_array = data_array
self.worker_id = worker_id
self.shape = array_shape or (len(self._raw_data_array),)
self.dtype = np.dtype(self._raw_data_array.get_obj()._type_)
self._wrap_the_array()
self._keep_running = True
@property
def data_array(self):
with self._raw_data_array.get_lock():
return self._data_array.copy()    
def _wrap_the_array(self):
self._data_array = np.ndarray(
self.shape,
dtype=self.dtype,
buffer=self._raw_data_array.get_obj()
)
def __getstate__(self):
return {
key: self.__dict__[key]
for key in self.__dict__.keys()
if key != "_data_array"
}
def __setstate__(self, state):
self.__dict__ = state
self._wrap_the_array()
def __call__(self, conn):
while self._keep_running:
msg, wait = conn.recv().split(";")
wait = int(wait)
time.sleep(wait)
print(f"W{self.worker_id}, PID {os.getpid()}", end="")
with self._raw_data_array.get_lock():
self._data_array[self.worker_id] += wait
print(f" @ {curr_time()} > {msg} <{wait}s>: {self._data_array}")
if msg == "stop":
self._keep_running = False
conn.close()

最新更新