python中读取文件和附加到列表的多重处理



我有一大组单独的文件,我正在尝试读取它们并将它们附加到一个npy文件中。为了加快进程,我想使用多处理。以下是我根据在这里找到的一些讨论编写的代码。然而,速度几乎与在不进行多处理的情况下逐个读取文件相同。它是否受到磁盘I/O速度的限制,或者我的代码有什么问题?非常感谢你的帮助。

更新:该过程是在群集上完成的。我检查了一个简单的cp -r directory directory命令的I/O速度,它可以达到1GB/s。但我下面的代码只能达到200 mb/s。换句话说,它可能不受I/O速度的限制。有人能检查一下我的代码写得是否正确吗?

import numpy as np
import os, re
from itertools import repeat
from multiprocessing import Process, Manager, Pool
import multiprocessing

def compressFile(inputs, targets, lock, rootdir, filename):
with lock:
data = np.load(os.path.join(rootdir, filename))
inputs.append(data)
targets.append('a' in filename) # a second list to store something about the file name

if __name__ == '__main__':
rootdir = 'directory'
with Manager() as manager, Pool(multiprocessing.cpu_count()) as pool:
lock = manager.Lock()
inputs = manager.list()
targets = manager.list()
file_list = [f for f in os.listdir(rootdir) if '.npy' in f]
pool.starmap(compressFile, zip(repeat(inputs), repeat(targets), repeat(lock), repeat(rootdir), file_list))
np.save(os.path.join(rootdir, 'inputs.npy'), np.asarray(inputs))
np.save(os.path.join(rootdir, 'targets.npy'), np.asarray(targets))

Read文件只有在锁外时才能并发执行。如果这只是一个IO操作。使用线程池可能会更快。但是解析到npy的文件也需要计算。您可以比较线程池和执行更快的进程池

import numpy as np
import os, re
from itertools import repeat
from multiprocessing import Process, Manager, Pool
import multiprocessing

def compressFile(inputs, targets, lock, rootdir, filename):
data = np.load(os.path.join(rootdir, filename))
with lock:

inputs.append(data)
targets.append('a' in filename) # a second list to store something about the file name

if __name__ == '__main__':
rootdir = 'directory'
with Manager() as manager, Pool(multiprocessing.cpu_count()) as pool:
lock = manager.Lock()
inputs = manager.list()
targets = manager.list()
file_list = [f for f in os.listdir(rootdir) if '.npy' in f]
pool.starmap(compressFile, zip(repeat(inputs), repeat(targets), repeat(lock), repeat(rootdir), file_list))
np.save(os.path.join(rootdir, 'inputs.npy'), np.asarray(inputs))
np.save(os.path.join(rootdir, 'targets.npy'), np.asarray(targets))

最新更新