我有一个相当大的数据文件(大约 150 万个条目),每个条目都被读取并用作计算的基础,然后将结果添加到结果数据结构中。单个计算的结果可能在此数据结构中重叠(例如,多个计算将被添加到数组的一个特定元素中)。这看起来大致是这样的:
import numpy as np
result = np.zeros(shape=(1000,1000))
with open("inputfile.txt", "r") as file:
for line in file:
n = calculateThings(float(line))
result += n
def calculateThings(data):
return np.where(some border condition, function(data), no mathematics)
计算本身大约需要 20-30 微秒,这没关系。但是,基础数据的大小将使这成为一个非常耗时的过程,因此我们的想法是并行化这些计算:多个进程将文件相互拆分,并行读出文件,然后它们都将计算结果添加到结果数组中。
但是,由于多个进程的性质,我不确定这是否可行(尤其是并行读出)以及我可以在多大程度上信任结果数组的准确性,因为我担心这些进程在读取或写入变量时会相互混淆。有没有人有这个特定问题的经验?
问题是:您反复向numpy
数组的每个元素添加一些值。如果您有多个进程并行执行此操作,则必须在某种锁的控制下执行此操作,这意味着无论如何,对numpy
数组的所有更新都将串行执行。这意味着您只能并行运行对calculateThings
的多次调用,这会计算所有加法。如果此功能在 CPU 方面不够昂贵,则多处理将不会产生更好的性能。
您需要的是能够将阵列存储在共享内存中,以便它可以在所有进程中共享。我假设数组必须是浮点类型。
import numpy as np
import ctypes
ARRAY_TYPE = ctypes.c_double
SHAPE = (1000, 1000)
def np_array_from_shared_array(shared_array):
return np.frombuffer(shared_array.get_obj(), ARRAY_TYPE).reshape(SHAPE[0], SHAPE[1])
def init_pool_processes(shared_array):
"""
Init each pool process.
The numpy array is created from the passed shared array and a global
variable is initialized with a reference to it.
"""
global result, lock
result = np_array_from_shared_array(shared_array)
lock = shared_array.get_lock()
def calculateThings(data):
global result
n = np.where(some_border_condition, function(data), no_mathematics)
with lock:
result += n
# Required for Windows:
if __name__ == '__main__':
from multiprocessing import Pool, Array
# Create shared memory version of a numpy array:
shared_arr = Array(ARRAY_TYPE, SHAPE[0] * SHAPE[1])
result = np_array_from_shared_array(shared_arr)
with open("inputfile.txt", "r") as file:
arguments = [float(line) for line in file]
pool = Pool(initializer=init_pool_processes, initargs=(shared_arr,))
pool.map(calculateThings, arguments)
pool.close()
pool.join()
#print(result)
如果我错了,请纠正我。进程不共享相同的内存空间,因此每个 python 对象为每个进程单独存在,在使用线程时,每个线程共享 python 模块的主内存空间。
所以我认为如果你把你的ndarray分成'n'个部分(它们是单独的对象)并只使用线程,那就没问题了。