如何在Python中使用Multiprocessing并行处理以下代码



我有一个函数generate(file_path),它返回整数索引和numpy数组。生成函数的简化如下:

def generate(file_path):
temp = np.load(file_path)
#get index from the string file_path
idx = int(file_path.split["_"][0])
#do some mathematical operation on temp
result = operate(temp)
return idx, result

我需要遍历一个目录并将generate(file_path)的结果收集到hdf5文件中。我的序列化代码如下:

for path in glob.glob(directory):
idx, result = generate(path)
hdf5_file["results"][idx,:] = result

hdf5_file.close()

我希望写一个多线程或多进程的代码来加快上面的代码。我怎么修改它?非常谢谢!

我的尝试是修改我的生成函数和修改我的"main"如下:

def generate(file_path):
temp = np.load(file_path)
#get index from the string file_path
idx = int(file_path.split["_"][0])
#do some mathematical operation on temp
result = operate(temp)

hdf5_path = "./result.hdf5"
hdf5_file = h5py.File(hdf5_path, 'w')
hdf5_file["results"][idx,:] = result
hdf5_file.close()
if __name__ == '__main__':
##construct hdf5 file
hdf5_path = "./output.hdf5"
hdf5_file = h5py.File(hdf5_path, 'w')
hdf5_file.create_dataset("results", [2000,15000], np.uint8)
hdf5_file.close()
path_ = "./compute/*"
p = Pool(mp.cpu_count())
p.map(generate, glob.glob(path_))
hdf5_file.close()

print("finished")

然而,它不起作用。它会抛出错误

KeyError: "Unable to open object (object 'results' doesn't exist)"

可以使用线程或进程池同时执行多个函数调用。下面是一个使用进程池的示例:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def generate(file_path: str) -> int:
sleep(1.0)
return file_path.split("_")[1]

def main():
file_paths = ["path_1", "path_2", "path_3"]

with ProcessPoolExecutor() as pool:
results = pool.map(generate, file_paths)

for result in results:
# Write to the HDF5 file
print(result)

if __name__ == "__main__":
main()

注意,你不应该同时写入同一个HDF5文件,即文件写入不应该附加在generate函数中。

我在初始化时检测到一些错误检查代码后的数据集;

您生成的hdf5文件的路径为"./result.hdf5">在生成函数中。

但是,我认为您忽略了创建"results">对象不存在

如果你仍然面对同样的问题错误信息

请回复

最新更新