执行apply_async时,多处理总是返回一个空文件



我有一个包含50万行的文件(input.txt(,我想用encrypt函数加密这些行,并将它们保存到一个名为output.txt的文件中。例如,input.txt

aab
abb
abc

然后我想让我的output.txt成为

001
011
012

简单循环版本

我有一个工作的for循环,但加密所有线路需要将近9个小时:

encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2
def encrypt(input_str):
output_int = ''
for i in input_str: 
for ch in i.split('n')[0]: # remove line break symbol n 
output_int += str(encryption_map[ch])
return output_int
text_path = 'input.txt'
with open(text_path, 'r') as input_file:
lines = input_file.readlines()
with open('output.txt', 'w') as output_file:
for l in lines:
output_int = encrypt(l)
output_file.write(output_int + 'n')    

apply_async版本

由于我想保持相同的排序,在output.txt中,似乎我必须使用apply_async。然后我的代码变成:

import multiprocessing as mp
encryption_map = {}
encryption_map['a']=0
encryption_map['b']=1
encryption_map['c']=2
def encrypt(input_str):
output_int = ''
for i in input_str: 
for ch in i.split('n')[0]: # remove line break symbol n 
output_int += str(encryption_map[ch])
return output_int
def write_result(output):
output_file.write(ipa_output + 'n')
# output_file.flush() # This line is suggested by another stack question
pool = mp.Pool(20)
text_path = 'input.txt'
with open(text_path, 'r') as input_file:
lines = input_file.readlines()
with open('output.txt', 'w') as output_file:
for l in lines:
pool.apply_async(encrypt, args=l, callback=write_result)
pool.close()
pool.join()

它运行得更快,但是output.txt总是空的。我的代码出了什么问题?我发现一个帖子在写文件时也有困难,他们建议我们将f.flush()放在write函数中,但它也不起作用。

您需要像这样编写args=(line,)

import multiprocessing as mp

encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2

output_file = open('output.txt', 'w')

def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]:
output_int += str(encryption_map[ch])
return output_int

def write_result(output):
output_file.write(output + 'n')

def main():
#mp.set_start_method('spawn')  # Only needed on OSX
pool = mp.Pool(2)
with open('input.txt') as input_file:
lines = input_file.readlines()
for line in lines:
pool.apply_async(encrypt, args=(line,), callback=write_result)
pool.close()
pool.join()
output_file.close()

if __name__ == '__main__':
main()

编辑

在上面的代码中,由于我们使用的是apply_async,所以输出中的行的顺序可能与输入中的行不同
如果我们想保留订单,那么我们可以使用map/map_async/imap
在这种情况下,imap可能是最好的选择,因为回调操作(IO绑定(比工作操作(CPU绑定(慢得多:

import multiprocessing as mp

encryption_map = {}
encryption_map['a'] = 0
encryption_map['b'] = 1
encryption_map['c'] = 2

output_file = open('output.txt', 'w')

def encrypt(input_str):
output_int = ''
for i in input_str:
for ch in i.split('n')[0]:
output_int += str(encryption_map[ch])
return output_int

def main():
mp.set_start_method('spawn')  # Only needed on OSX
pool = mp.Pool(2)
with open('input.txt') as input_file:
lines = input_file.readlines()
for output in pool.imap(encrypt, lines):
output_file.write(output + 'n')
pool.close()
pool.join()

if __name__ == '__main__':
main()

相关内容

  • 没有找到相关文章

最新更新