Python按块合并两个巨大的文件



Vlad注释后改进的描述和示例我有两个文件(A和B(,我需要将它们合并到第三个文件C中,方法如下:

  • 从A取一行,放入C
  • 从B接几条线,输入C
  • 继续,直到A和B的所有线路都在C

我举了一个例子来测试代码,并创建了一个元组列表,其中第一个元素是行号列表,第二个元素是我想从中读取这些行的文件。我想用多处理来执行读取过程,以加快计算速度。

这里有一个预期结果的例子:

file_A.txt

A0
A1
A2
A3
A4
A5
A6
A7
A8
A9

file_B.txt

B0
B1
B2
B3
B4
B5
B6
B7
B8
B9

区块如下:

lines [9] for file_B.txt
lines [4] for file_B.txt
lines [1] for file_A.txt
lines [4, 5, 7, 8] for file_A.txt
lines [3, 9] for file_A.txt
lines [6] for file_B.txt
lines [8] for file_B.txt
lines [0, 2, 3, 5] for file_B.txt
lines [0, 2, 6] for file_A.txt
lines [1, 7] for file_B.txt

file_C.txt

B9
B4
A1
A4
A5
A7
A8
A3
A9
B6
B8
B0
B2
B3
B5
A0
A2
A6
B1
B7

以下是代码的示例

from concurrent import futures
import os
from itertools import islice
from random import randint, shuffle
import time
import cProfile
def read(block):
linenums = block[0]
f = block[1]
for i,line in enumerate(f):
if i in linenums:
yield line
f.seek(0)

def random_chunk(li, min_chunk=1, max_chunk=3):
it = iter(li)
while True:
nxt = sorted(list(islice(it,randint(min_chunk,max_chunk))))
if nxt:
yield nxt
else:
break
def random_partition(N,m=1,M=3):
L = list(range(N))
shuffle(L)
M = randint(1,N)
return list(random_chunk(L,m,M))

if __name__ == '__main__':

if os.path.exists('file_C.txt'):
os.remove('file_C.txt')
N = 10
with open('file_A.txt','w') as file_A:
for i in range(N):
file_A.write("A"+str(i)+"n")
with open('file_B.txt','w') as file_B:
for i in range(N):
file_B.write("B"+str(i)+"n")
print('files created')
with open('file_A.txt') as file_A, open('file_B.txt') as file_B, open('file_C.txt','w') as file_C:
partitions_A = random_partition(N)
L_A = [(p, file_A) for p in partitions_A]
partitions_B = random_partition(N)
L_B = [(p, file_B) for p in partitions_B]
L = L_A + L_B
shuffle(L)
for el in L:
print(f"lines {el[0]} for {el[1].name}")
print('shuffle')

pr = cProfile.Profile()
pr.enable()
with futures.ThreadPoolExecutor(8) as executor:
# schedule one map/worker for each block in the original data
q = executor.map(read, [block for block in L])
file_C.writelines([el for x in q for el in x])
pr.disable()
pr.print_stats(sort='tottime')

问题是,对于N(1000000(的高值,代码在我的笔记本电脑上花费了大量时间,在实际用例中,我需要将此代码应用于具有数百万行的文件(每个文件最多20GB(。我对N=100000进行了分析,结果如下:

202691 function calls (202669 primitive calls) in 90.688 seconds
Ordered by: internal time
ncalls  tottime  percall  cumtime  percall filename:lineno(function)
200006   90.311    0.000   90.320    0.000 test.py:8(read)
1    0.350    0.350   90.671   90.671 test.py:62(<listcomp>)
1    0.010    0.010    0.010    0.010 {method 'writelines' of '_io._IOBase' objects}
688    0.005    0.000    0.009    0.000 codecs.py:319(decode)
688    0.003    0.000    0.003    0.000 {built-in method _codecs.utf_8_decode}
3    0.002    0.001    0.002    0.001 {built-in method marshal.loads}
27    0.001    0.000    0.001    0.000 {method 'acquire' of '_thread.lock' objects}

大部分时间都取自阅读部分。

你对如何改进代码有什么建议吗?有没有更好的策略来面对这个问题?

我在这里可能完全错了,但你可以试试这里描述的Pythonlinecache模块。我不知道它是否提前读取,或者读取了多少,或者它对缓存弹出的政策是什么。

我做了一个快速实验,在我的Mac上的/usr/share/dict/words中查找了一百万条随机线,它有大约250000条

import random
import linecache
In [18]: %%timeit
...:     for lookup in range(1000000):
...:         lineNumber = random.randint(1,250000)
...:         line = linecache.getline('/usr/share/dict/words', lineNumber)
...: 
608 ms ± 8.27 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

最新更新