我有以下代码运行速度非常慢。这是一个程序,可以拆分一个大文件(80gig),并将其放入树文件夹结构中进行快速查找。我在代码中做了一些注释,以帮助您理解它。
# Libraries
import os
# Variables
file="80_gig_file.txt"
outputdirectory="sorted"
depth=4 # This is the tree depth
# Preperations
os.makedirs(outputdirectory)
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
# Read file line by line and by not loading it entirely in memory
# Here it is possible to work with a queue I think, but how to do it properly without loading too much in memory?
with open(file) as infile:
for line in infile:
pipeline(line)
有没有一种方法可以使多线程工作?因为我自己尝试了几个我在网上找到的例子,它把所有的东西都放进了内存,导致我的电脑多次冻结。
首先,(IMO)最简单的解决方案
如果行看起来是完全独立的,只需将文件拆分为N个块,将文件名作为程序参数传递到打开的位置,然后运行当前脚本的多个实例,在多个命令行上手动启动它们。
优点:
- 无需深入研究多处理、进程间通信等
- 不需要对代码进行太多更改
缺点:
- 您需要对大文件进行预处理,将其拆分为块(尽管这将比您当前的执行时间快得多,因为您不会有每行打开-关闭的场景)
- 您需要自己启动进程,为每个进程传递适当的文件名
这将被实现为:
预处理:
APPROX_CHUNK_SIZE = 1e9 #1GB per file, adjust as needed
with open('big_file.txt') as fp:
chunk_id = 0
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
while next_chunk:
with open('big_file_{}.txt'.format(chunk_id), 'w') as ofp:
ofp.writelines(next_chunk)
chunk_id += 1
next_chunk = fp.readlines(APPROX_CHUNK_SIZE)
来自readlines
文档:
如果存在可选的sizehint参数,则不读取EOF,而是读取总计约sizehint字节的整行(可能在四舍五入到内部缓冲区大小之后)。
这样做不会确保所有块中的行数为偶数,但会使预处理速度更快,因为你是在块中读取,而不是逐行读取。根据需要调整区块大小。此外,请注意,通过使用readlines
,我们可以确保块之间不会出现断线,但由于函数返回一个行列表,我们使用writelines
将其写入输出文件(相当于在列表和ofp.write(line)
上循环)。为了完整起见,让我注意到,您还可以连接内存中的所有字符串,并只调用write
一次(即调用ofp.write(''.join(next_chunk))
),这可能会给您带来一些(微小的)性能优势,但会带来(高得多的)RAM使用率。
主要脚本:
你只需要在最上面进行修改:
import sys
file=sys.argv[1]
... # rest of your script here
通过使用argv
,您可以将命令行参数传递给程序(在本例中,是要打开的文件)。然后,只需将脚本运行为:
python process_the_file.py big_file_0.txt
这将运行一个进程。打开多个终端并为每个终端运行相同的命令big_file_N.txt
,它们将相互独立。
注意:我使用argv[1]
,因为对于所有程序,argv
的第一个值(即argv[0]
)始终是程序名称。
然后,multiprocessing
溶液
第一个解决方案虽然有效,但并不十分优雅,特别是因为如果从80GB大小的文件开始,您将有80个文件。
一个更干净的解决方案是使用python的multiprocessing
模块(重要的是:不要threading
!如果你不知道区别,请查找"全局解释器锁",以及为什么python中的多线程不能像你想象的那样工作)。
这个想法是有一个";生产者";打开大文件并不断将其中的行放入队列的过程。然后,一池";消费者;从队列中提取行并进行处理的进程。
优点:
- 一个脚本可以完成所有操作
- 无需打开多个终端并进行打字
缺点:
- 复杂性
- 使用进程间通信,这有一些开销
这将按如下方式实现:
# Libraries
import os
import multiprocessing
outputdirectory="sorted"
depth=4 # This is the tree depth
# Process each line in the file
def pipeline(line):
# Strip symbols from line
line_stripped=''.join(e for e in line if e.isalnum())
# Reverse the line
line_stripped_reversed=line_stripped[::-1]
file=outputdirectory
# Create path location in folderbased tree
for i in range(min((depth),len(line_stripped))):
file=os.path.join(file,line_stripped_reversed[i])
# Create folders if they don't exist
os.makedirs(os.path.dirname(file), exist_ok=True)
# Name the file, with "-file"
file=file+"-file"
# This is the operation that slows everything down.
# It opens, writes and closes a lot of small files.
# I cannot keep them open because currently half a million possibilities (and thus files) are worst case open (n=26^4).
f = open(file, "a")
f.write(line)
f.close()
if __name__ == '__main__':
# Variables
file="80_gig_file.txt"
# Preperations
os.makedirs(outputdirectory)
pool = multiprocessing.Pool() # by default, 1 process per CPU
LINES_PER_PROCESS = 1000 # adapt as needed. Higher is better, but consumes more RAM
with open(file) as infile:
next(pool.imap(pipeline, infile, LINES_PER_PROCESS))
pool.close()
pool.join()
CCD_ 15行是将在每个进程上运行的代码与仅在";父亲";。每个进程都定义了pipeline
,但实际上只有父进程产生了一个工作者池并应用了该函数。你可以在这里找到更多关于multiprocessing.map
的详细信息
编辑:
增加了关闭和加入池,以防止主进程退出并杀死进程中的孩子。