我有一个包含50万行的文件(input.txt(,我想用encrypt
函数加密这些行,并将它们保存到一个名为output.txt
的文件中。例如,input.txt
是
aab
abb
abc
然后我想让我的output.txt
成为
001
011
012
简单循环版本
我有一个工作的for
循环,但加密所有线路需要将近9个小时:
encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2
def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]: # remove line break symbol n
output_int += str(encryption_map[ch])
return output_int
text_path = 'input.txt'
with open(text_path, 'r') as input_file:
lines = input_file.readlines()
with open('output.txt', 'w') as output_file:
for l in lines:
output_int = encrypt(l)
output_file.write(output_int + 'n')
apply_async
版本
由于我想保持相同的排序,在output.txt
中,似乎我必须使用apply_async
。然后我的代码变成:
import multiprocessing as mp
encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2
def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]: # remove line break symbol n
output_int += str(encryption_map[ch])
return output_int
def write_result(output):
output_file.write(ipa_output + 'n')
# output_file.flush() # This line is suggested by another stack question
pool = mp.Pool(20)
text_path = 'input.txt'
with open(text_path, 'r') as input_file:
lines = input_file.readlines()
with open('output.txt', 'w') as output_file:
for l in lines:
pool.apply_async(encrypt, args=l, callback=write_result)
pool.close()
pool.join()
它运行得更快,但是output.txt总是空的。我的代码出了什么问题?我发现一个帖子在写文件时也有困难,他们建议我们将f.flush()
放在write函数中,但它也不起作用。
您需要像这样编写args=(line,)
:
import multiprocessing as mp
encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2
output_file = open('output.txt', 'w')
def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]:
output_int += str(encryption_map[ch])
return output_int
def write_result(output):
output_file.write(output + 'n')
def main():
#mp.set_start_method('spawn') # Only needed on OSX
pool = mp.Pool(2)
with open('input.txt') as input_file:
lines = input_file.readlines()
for line in lines:
pool.apply_async(encrypt, args=(line,), callback=write_result)
pool.close()
pool.join()
output_file.close()
if __name__ == '__main__':
main()
编辑:
在上面的代码中,由于我们使用的是apply_async
,所以输出中的行的顺序可能与输入中的行不同
如果我们想保留订单,那么我们可以使用map
/map_async
/imap
在这种情况下,imap可能是最好的选择,因为回调操作(IO绑定(比工作操作(CPU绑定(慢得多:
import multiprocessing as mp
encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2
output_file = open('output.txt', 'w')
def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]:
output_int += str(encryption_map[ch])
return output_int
def main():
mp.set_start_method('spawn') # Only needed on OSX
pool = mp.Pool(2)
with open('input.txt') as input_file:
lines = input_file.readlines()
for output in pool.imap(encrypt, lines):
output_file.write(output + 'n')
pool.close()
pool.join()
if __name__ == '__main__':
main()