读取和处理一个巨大的文件(对于内存来说太大)时执行多线程处理



我有以下代码运行速度非常慢。这是一个程序,可以拆分一个大文件(80gig),并将其放入树文件夹结构中进行快速查找。我在代码中做了一些注释,以帮助您理解它。

# Libraries
import os

# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth

# Preperations
os.makedirs(outputdirectory)
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down. 
# It opens, writes and closes a lot of small files. 
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()

# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
for line in infile:
pipeline(line)

有没有一种方法可以使多线程工作?因为我自己尝试了几个我在网上找到的例子,它把所有的东西都放进了内存,导致我的电脑多次冻结。

首先,(IMO)最简单的解决方案

如果行看起来是完全独立的,只需将文件拆分为N个块,将文件名作为程序参数传递到打开的位置,然后运行当前脚本的多个实例,在多个命令行上手动启动它们。

优点:

  • 无需深入研究多处理、进程间通信等
  • 不需要对代码进行太多更改

缺点:

  • 您需要对大文件进行预处理,将其拆分为块(尽管这将比您当前的执行时间快得多,因为您不会有每行打开-关闭的场景)
  • 您需要自己启动进程,为每个进程传递适当的文件名

这将被实现为:

预处理:

APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)

来自readlines文档:

如果存在可选的sizehint参数,则不读取EOF,而是读取总计约sizehint字节的整行(可能在四舍五入到内部缓冲区大小之后)。

这样做不会确保所有块中的行数为偶数,但会使预处理速度更快,因为你是在块中读取,而不是逐行读取。根据需要调整区块大小。此外,请注意,通过使用readlines,我们可以确保块之间不会出现断线,但由于函数返回一个行列表,我们使用writelines将其写入输出文件(相当于在列表和ofp.write(line)上循环)。为了完整起见,让我注意到,您还可以连接内存中的所有字符串,并只调用write一次(即调用ofp.write(''.join(next_chunk))),这可能会给您带来一些(微小的)性能优势,但会带来(高得多的)RAM使用率。

主要脚本:

你只需要在最上面进行修改:

import sys
file=sys.argv[1]
... # rest of your script here

通过使用argv,您可以将命令行参数传递给程序(在本例中,是要打开的文件)。然后,只需将脚本运行为:

python process_the_file.py big_file_0.txt

这将运行一个进程。打开多个终端并为每个终端运行相同的命令big_file_N.txt,它们将相互独立。

注意:我使用argv[1],因为对于所有程序,argv的第一个值(即argv[0])始终是程序名称。


然后,multiprocessing溶液

第一个解决方案虽然有效,但并不十分优雅,特别是因为如果从80GB大小的文件开始,您将有80个文件。

一个更干净的解决方案是使用python的multiprocessing模块(重要的是:不要threading!如果你不知道区别,请查找"全局解释器锁",以及为什么python中的多线程不能像你想象的那样工作)。

这个想法是有一个";生产者";打开大文件并不断将其中的行放入队列的过程。然后,一池";消费者;从队列中提取行并进行处理的进程。

优点:

  • 一个脚本可以完成所有操作
  • 无需打开多个终端并进行打字

缺点:

  • 复杂性
  • 使用进程间通信,这有一些开销

这将按如下方式实现:

# Libraries
import os
import multiprocessing
outputdirectory="sorted"
depth=4 # This is the tree depth
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down. 
# It opens, writes and closes a lot of small files. 
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
if __name__ == '__main__':
# Variables
file="80_gig_file.txt"
# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM
with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()

CCD_ 15行是将在每个进程上运行的代码与仅在";父亲";。每个进程都定义了pipeline,但实际上只有父进程产生了一个工作者池并应用了该函数。你可以在这里找到更多关于multiprocessing.map的详细信息

编辑:

增加了关闭和加入池,以防止主进程退出并杀死进程中的孩子。

最新更新