Python 中一系列循环的多处理?



我有一个非常大的数组,我需要创建超过 10^7 列的列,需要根据某些条件进行过滤/修改。有一组 24 个不同的标准(由于组合而为 2x4x3),这意味着过滤/修改需要执行 24 次,每个结果都保存在不同的指定目录中。

由于这需要很长时间,我正在研究使用多处理来加快该过程。谁能帮我?下面是一个示例代码:

import itertools
import numpy as np
sample_size = 1000000
variables = 25
x_array = np.random.rand(variables, sample_size)

x_dir = ['x1', 'x2']
y_dir = ['y1', 'y2', 'y3', 'y4']  
z_dir = ['z1', 'z2', 'z3']
x_directories = [0, 1]
y_directories = [0, 1, 2, 3]
z_directories = [0, 1, 2]
directory_combinations = itertools.product(x_directories, y_directories, z_directories)
for k, t, h in directory_combinations:
target_dir=main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h]
for i in range(sample_size):
#x_array gets filtered/modified 

#x_array gets saved in target_dir directory as a dataframe after modification'''

基本上,对于多处理,我希望每个循环都由我可用的 16 个内核中的单个内核处理,或者通过使用所有 16 个内核来加速每个循环迭代。

提前非常感谢!

获取循环之一并将其重写为函数

for k, t, h in directory_combinations:

成为例如

def func(k,t,h):
....
pool = multiprocessing.Pool(12)
pool.starmap_async(func, directory_combinations, 32)

它生成 12 个进程,这些进程在 2 个参数的每次迭代上应用 func。数据通过 32 长度的块传输到进程。

下面的代码首先在共享内存中创建x_array,并使用全局变量x_array初始化池中的每个进程,这就是这个共享数组。

我会移动创建此全局x_array副本的代码,对其进行处理,然后将dataframe写出到函数worker,该函数作为参数传递目标目录。

import itertools
import numpy as np
import ctypes
import multiprocessing as mp
SAMPLE_SIZE = 1000000
VARIABLES = 25
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = mp.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, shape):
global x_array
# Recreate x_array using shared memory array:
x_array = to_numpy_array(shared_array, shape)
def worker(target_dir):
# make copy of x_array with np.copy
x_array_copy = np.copy(x_array)
for i in range(sample_size):
#x_array_copy gets filtered/modified
...
#x_array_copy gets saved in target_dir directory as a dataframe after modification
def main(): 
main_dir = '.' # for example
x_dir = ['x1', 'x2']
y_dir = ['y1', 'y2', 'y3', 'y4']
z_dir = ['z1', 'z2', 'z3']
x_directories = [0, 1]
y_directories = [0, 1, 2, 3]
z_directories = [0, 1, 2]
directory_combinations = itertools.product(x_directories, y_directories, z_directories)
target_dirs =  [main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h] for k, t, h in directory_combinations]
x_array = np.random.rand(VARIABLES, SAMPLE_SIZE)
shape = x_array.shape
# Create array in shared memory
shared_array = to_shared_array(x_array, ctypes.c_int64)
# Recreate x_array using the shared memory array as the base:
x_array = to_numpy_array(shared_array, shape)
# Create pool of 12 processes copying the shared array to each process:
pool = mp.Pool(12, initializer=init_pool, initargs=(shared_array, shape))
pool.map(worker, target_dirs)
# This is required for Windows:
if __name__ == '__main__':
main()

最新更新