在并行处理中填充空数组



假设我有一个myfunc,它接受输入ij,计算它们的和,并用答案填充传入数组。这就是问题的样子:

import numpy as np
from functools import partial
from multiprocessing import Pool
def myfunc(i: int, j: int, some_array: np.ndarray): 
ans = i+j
some_array[i,j] = ans 

some_array = np.zeros(shape = (2,2))

execute = partial(myfunc, some_array = some_array)
for i in range(2): 
for j in range(2): 
execute(i,j)
print(some_array)

[[0. 1.]
[1. 2.]]

现在,让我们想象一下,我想将这段代码并行化。我这样做的方式如下:

iter = [(i,j) for i in range(2) for j in range(2)]

with Pool() as p: 
p.starmap(execute, iterable = iter)

这不会在每次使用不同的args调用execute时更新空数组。最后一个数组全部为零。这可能是因为p.starmap在最后产生了所有结果的列表,但假设为每个iterable调用execute,则它应该在每个调用中执行some_array[i,j] = ans

任何想法/帮助都将不胜感激。

这里最大的问题是单独的进程有单独的内存,所以当调用execute时,它使用不同的some_array副本。主进程中some_array的副本永远不会更新,结果也不会更改(全部为零(。有两种方法:消息传递和共享内存。大多数multiprocessing.Pool函数已经具有某种机制来return——来自目标函数的值,该值通过酸洗结果并用Queue将其发送回主进程来操作。这样做的好处是它非常灵活,可以处理许多类型的数据(任何可以进行pickle的数据(。不利的一面是,您必须在主进程中重新组装数据,并且在发送过程中会有相当大的开销(缓慢(。另一种解决方案是要求操作系统分割出一块内存,两个进程都可以访问。这相当快,但需要一些设置(和清理(,并且仅限于可以由二进制缓冲区表示的数据。幸运的是,numpy可以从现有的二进制缓冲区(我们的共享内存块(创建一个数组。我有一个助手类,我已经随着时间的推移进行了调整,以帮助简化共享内存的记账:

import numpy as np
from multiprocessing import shared_memory, Process
class Shared_Arr: #helper class to make shared_memory arrays easier
def __init__(self, shape, dtype, shm=None):
self.shape=shape
self.dtype=dtype
if shm is None:
n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
self.shm = shared_memory.SharedMemory(create=True, size=n_bytes)
self.owner = True
else:
self.shm = shm
self.owner = False
self.close = self.shm.close
self.unlink = self.shm.unlink
self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)
def __reduce__(self): #make it picklable so it can be sent to a child process correctly
return (self.__class__, (self.shape, self.dtype, self.shm))
def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close() #closes the memory-mapped file
if self.owner:
self.unlink() #tell the OS to delete the file
def populate_arr(shared, value):
with shared: #without the context manager you could just manually call shared.close() when you're done with it
shared.arr[:] = value
if __name__ == "__main__":
with Shared_Arr([10], int) as shared:
shared.arr[:] = 0 #np.ndarray may operate like np.empty? initialize to zero
print(shared.arr) #before modification
p = Process(target=populate_arr, args=(shared, 5))
p.start()
p.join()
print(shared.arr) #after being modified in a separate process

相关内容

  • 没有找到相关文章

最新更新