我有大量的文件,我想递归并执行 md5 校验和。
其中许多文件存储在多个物理磁盘上,但都挂载在同一目录中:
/mnt/drive1/dir1/file.jpg
/mnt/drive2/dir1/file2.jpg
如何在不将整个目录和文件结构加载到内存中的情况下通过/mnt 递归?
有没有办法用多个线程做到这一点?可能没有必要使用多个线程/进程递归遍历目录,但文件操作可能是 CPU 密集型的,这将受益于多个 CPU 内核。
提前谢谢。
为什么不使用简单的os.walk?它不会占用任何大量内存。
import os
for root, dirs, files in os.walk('/mnt'):
for name in files:
print os.path.join(root, name)
扩展一下我的评论,这段代码创建了一个进程池(在命令行上给出的大小(,它会打开当前目录下的每个文件,压缩 50 次,并计算它的 CRC。然后,它将所有CRCXOR放在一起并打印出来。
import multiprocessing
import os
import sys
import zlib
NUM_PROCS = int(sys.argv[1])
def processFile(filepath):
infile = open(filepath, 'r')
contents = infile.read()
for i in xrange(50):
contents = zlib.compress(contents)
return zlib.crc32(contents)
def generateFilepaths():
for (dirpath, dirnames, filenames) in os.walk('.'):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
yield filepath
if __name__ == '__main__':
pool = multiprocessing.Pool(NUM_PROCS)
fullCrc = 0
for crc in pool.imap_unordered(processFile, generateFilepaths()):
fullCrc ^= crc
print fullCrc
请注意,如果不对每个文件进行如此荒谬的工作,该程序仍将完全受 IO 约束。因此,线程很可能会为您带来很少的速度。
import multiprocessing
import os.path
import hashlib
import sys
VALID_EXTENSIONS = ('.JPG', '.GIF', '.JPEG')
MAX_FILE_SZ = 1000000
def md5_file(fname):
try:
with open(fname) as fo:
m = hashlib.md5()
chunk_sz = m.block_size * 128
data = fo.read(chunk_sz)
while data:
m.update(data)
data = fo.read(chunk_sz)
md5_file.queue.put((fname, m.hexdigest()))
except IOError:
md5_file.queue.put((fname, None))
def is_valid_file(fname):
ext = os.path.splitext(fname)[1].upper()
fsz = os.path.getsize(fname)
return ext in VALID_EXTENSIONS and fsz <= MAX_FILE_SZ
def init(queue):
md5_file.queue = queue
def main():
# Holds tuple (fname, md5sum) / md5sum will be none if an IOError occurs
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(None, init, [queue])
for dirpath, dirnames, filenames in os.walk(sys.argv[1]):
# Convert filenames to full paths...
full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
filenames)
full_path_fnames = filter(is_valid_file, full_path_fnames)
pool.map(md5_file, full_path_fnames)
# Dump the queue
while not queue.empty():
print queue.get()
return 0
if __name__ == '__main__':
sys.exit(main())
可能不是防弹的,但它对我有用。 您可能需要对其进行调整以提供有关其正在执行的操作的一些反馈。
出于某种奇怪的原因,您无法共享全局队列。 因此,我不得不使用游泳池的initializer
功能。 我不确定为什么会这样。
只需将要处理的根目录作为唯一的参数传递,完成后它将转储出 md5 总和。
不确定是否有一个普遍正确的答案,但也许你想从简单的东西开始,基准测试,看看瓶颈是什么。
选项 1
生成带有find /mnt -type f
的文件列表并将其传递给脚本。这可以使用多个工作线程(multiprocessing
(完美并行化,但您可能希望根据物理设备拆分列表,以便每个工作线程都由一个线程处理。如果连接的存储速度很慢,这可能很重要。
选项 2
生成顶级目录列表(例如.max深度 1 或 2(,并让线程分别在每个分区上运行。这可以通过Linux的find -typde d
和/或Python的os.walk()
来完成。
我不会费心在内存中加载太多内容,只要您不知道它到底有多糟糕。也就是说,在你基准测试之前。另外,我认为os.walk
或文件列表不会成为严重的内存问题。如果我错了,请纠正我。