如何在 Python 中安全地将数据从单个 HDF5 文件并行写入多个文件?



我正在尝试将数据(从 hdf5 格式的单个文件)写入多个文件,当任务串行执行时,它工作正常。现在我想提高效率并使用multiprocessing模块修改代码,但输出有时会出错。这是我代码的简化版本。

import multiprocessing as mp
import numpy as np
import math, h5py, time
N = 4  # number of processes to use
block_size = 300
data_sz = 678
dataFile = 'mydata.h5'
# fake some data
mydata = np.zeros((data_sz, 1))
for i in range(data_sz):
mydata[i, 0] = i+1
h5file = h5py.File(dataFile, 'w')
h5file.create_dataset('train', data=mydata)
# fire multiple workers
pool = mp.Pool(processes=N)
total_part = int(math.ceil(1. * data_sz / block_size))
for i in range(total_part):
pool.apply_async(data_write_func, args=(dataFile, i, ))
pool.close()
pool.join()

data_write_func()的结构是:

def data_write_func(h5file_dir, i, block_size=block_size):
hf = h5py.File(h5file_dir)
fout = open('data_part_' + str(i), 'w')
data_part = hf['train'][block_size*i : min(block_size*(i+1), data_sz)]  # np.ndarray
for line in data_part:
# do some processing, that takes a while...
time.sleep(0.01)
# then write out..
fout.write(str(line[0]) + 'n')
fout.close()

当我设置N=1时,它运行良好。 但是当我设置N=2N=4时,结果有时会搞砸(不是每次! 例如,在data_part_1中,我希望输出为:

301,
302,
303,
...

但有时我得到的是

0,
0,
0,
...

有时我得到

379,
380,
381,
...

我是多处理模块的新手,发现它很棘手。如果有任何建议,将不胜感激!

按照 Andriy 建议的修复fout.writemydata=...后,您的程序按预期工作,因为每个进程都会写入自己的文件。这些过程不可能相互混合。

你的probaby想要做的是使用multiprocessing.map()来为你削减你的迭代对象(所以你不需要做block_size的事情),而且它保证结果是按顺序完成的。我已经重新设计了您的代码以使用多处理映射:

import multiprocessing
from functools import partial
import pprint
def data_write_func(line):
i = multiprocessing.current_process()._identity[0]
line = [i*2 for i in line]
files[i-1].write(",".join((str(s) for s in line)) + "n")
N = 4
mydata=[[x+1,x+2,x+3,x+4] for x in range(0,4000*N,4)] # fake some data
files = [open('data_part_'+str(i), 'w') for i in range(N)]
pool = multiprocessing.Pool(processes=N)
pool.map(data_write_func, mydata)
pool.close()
pool.join()

请注意:

  • 我取自过程本身,它是 1 或 2
  • 由于现在每行都调用data_write_func,因此需要在父进程中完成文件打开。另外:您不需要手动close()文件,操作系统将在退出python程序时为您执行此操作。

现在,我想最终您希望将所有输出放在一个文件中,而不是单独的文件中。如果您的输出行在 Linux 上低于 4096 字节(或在 OSX 上低于 512 字节,对于其他操作系统,请参阅此处),您实际上可以安全地打开一个文件(在追加模式下)并让每个进程写入该文件,因为低于这些大小的写入保证是 Unix 原子的。

更新

"如果数据作为数据集存储在 hdf5 文件中怎么办?">

根据 hdf5 文档,这从 2.2.0 版开始开箱即用:

并行 HDF5

是 HDF5 库的配置,可让您在多个并行进程中共享打开的文件。它使用 MPI(消息传递接口)标准进行进程间通信

因此,如果您在代码中执行此操作:

h5file = h5py.File(dataFile, 'w')
dset = h5file.create_dataset('train', data=mydata)

然后,您可以从进程中访问 dset 并读取/写入它,而无需采取任何额外的措施。另请参阅使用多处理的 h5py 示例

无法复制该问题。这是我的完整代码:

#!/usr/bin/env python
import multiprocessing
N = 4
mydata=[[x+1,x+2,x+3,x+4] for x in range(0,4000*N,4)] # fake some data
def data_write_func(mydata, i, block_size=1000):
fout = open('data_part_'+str(i), 'w')
data_part = mydata[block_size*i: block_size*i+block_size]
for line in data_part:
# do some processing, say *2 for each element...
line = [x*2 for x in line]
# then write out..
fout.write(','.join(map(str,line))+'n')
fout.close()
pool = multiprocessing.Pool(processes=N)
for i in range(2):
pool.apply_async(data_write_func, (mydata, i, ))
pool.close()
pool.join()

data_part_0的示例输出:

2,4,6,8
10,12,14,16
18,20,22,24
26,28,30,32
34,36,38,40
42,44,46,48
50,52,54,56
58,60,62,64

多处理不能保证不同线程之间的代码执行顺序,2 个进程以与其创建顺序相反的顺序执行是完全合理的(至少在 Windows 和主流 Linux 上)

通常,当您使用并行化时,您需要工作线程来生成数据,然后将数据聚合到线程安全数据结构中并将其保存到文件中,但是您在这里写入一个文件,大概是在一块硬盘上,您有任何理由相信使用多个线程可以获得任何额外的性能吗?

相关内容

  • 没有找到相关文章

最新更新