我有两个长度N
的大型 2D 方形numpy
数组。
我有一个函数,它接收每个数组的行索引,对各自的行进行操作,然后将单个结果向量放在第三个数组中。
我的问题是我想并行化这个函数,但是,我不确定如何去做。我一直在查看 Pythonmultiprocessing
库中的pool
对象,但我不清楚数据管理。
根据我目前的理解,pool.apply_async()
-方法接收一个可迭代对象,然后通过各个进程将其拆分以进行计算。给它一个元组列表是否合理,其中元组中的每个元素都是numpy
数组之一的一行?
或者,有没有办法让每个进程一次性将数组加载到内存中,然后继续使用这些加载的数组来执行函数的每次执行?在这种情况下,可迭代对象将再次是元组列表,但是每个元组将在内存中保存数组的一对索引,而不是行本身。
最后,有没有办法让每个进程将结果向量提交到所有工作线程共享的单个数据对象中,然后我可以保存该对象?
您可以让每个进程保留自己的输入数组副本。但它们无法写入共享输出数组;这就是使用子进程而不是线程的重点。(在线程中,全局解释器锁可能会阻止线程并发运行。
在Linux(可能还有MacOS(中,初始化的全局变量将由具有写入时复制的子进程继承;只要子进程不尝试写入,变量就会使用共享内存。在 Windows 中,您必须为每个工作线程初始化此类全局变量。
这是如何执行此操作:
import numpy as np
from multiprocessing import Pool
PERSISTENT_DATA = {}
def func(ij):
i, j = ij
return PERSISTENT_DATA['a'][i] + PERSISTENT_DATA['b'][j]
def init_persistent_data(a, b):
PERSISTENT_DATA['a'] = a
PERSISTENT_DATA['b'] = b
def run_parallel():
n, m = 10, 5
np.random.seed(1)
a = np.random.randint(10, size=(n, m))
b = np.random.randint(10, size=(n, m))
# In Linux, these are inherited by the subprocesses.
init_persistent_data(a, b)
ij_tuples = [(0, 1), (1, 2)]
# In Linux, leave the initializer and initargs out.
with Pool(
processes=4,
initializer=init_persistent_data,
initargs=(a, b)
) as pl:
result = pl.map(func, ij_tuples)
result = np.array(result)
if __name__ == '__main__':
run_parallel()