我需要处理数千个文件,并希望使用并行处理来节省一些时间。
import os
import re
import csv
import numpy as np
rawdata="/content/drive/My Drive/somepath"
outfolder="/content/drive/My Drive/somepath2"
if not os.path.exists(outfolder):
os.makedirs(outfolder)
for dirpath, dirnames, filenames in os.walk(rawdata):
for file in sorted(filenames):
filename = os.path.join(os.path.abspath(dirpath), file)
fileno = re.search('(.*?).', file).group(1)
print("check " + fileno)
if not os.path.exists(outfolder+'/'+fileno+'.csv'):
print("Processing " + filename)
#run a bunch of stuff
with open(outfolder+'/'+fileno+'.csv', 'w', newline='') as myfile:
wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
wr.writerow(a bunch of stuff)
print('n job done!!')
我尝试multiprocessing
如下。该代码没有引发任何错误,但不知何故,它仍在处理相同的文件20分钟。通常处理每个文件需要 2 分钟。任何帮助都非常感谢。
import multiprocessing as mp
def process_file(file):
fileno = re.search('(.*?).', file).group(1)
print("check " + fileno)
if not os.path.exists(outfolder+'/'+fileno+'.csv'):
print("Processing " + filename)
#run a bunch of stuff
with open(outfolder+'/'+fileno+'.csv', 'w', newline='') as myfile:
wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
wr.writerow(a bunch of stuff)
pool = mp.Pool(4)
for dirpath, dirnames, filenames in os.walk(rawdata):
for file in sorted(filenames):
pool.apply_async(process_file, [file])
pool.close()
pool.join()
print('n job done!!')
试试这个:
import multiprocessing as mp
fileno = re.search('(.*?).', file).group(1)
print("check " + fileno)
if not os.path.exists(outfolder+'/'+fileno+'.csv'):
print("Processing " + filename)
#run a bunch of stuff
with open(outfolder+'/'+fileno+'.csv', 'w', newline='') as myfile:
wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
wr.writerow(a bunch of stuff)
def main():
pool = mp.Pool(4)
for dirpath, dirnames, filenames in os.walk(rawdata):
for file in sorted(filenames):
pool.apply_async(process_file, [file])
pool.close()
pool.join()
print('n job done!!')
if __name__ == "__main__":
main()
我认为这会起作用,
files = [ ]
for dirpath, dirnames, filenames in os.walk(rawdata):
files.extend ( filenames )
with mp.Pool( 4 ) as p:
p.map( process_file , files )
这可以通过捕获和报告错误并在输出中放置一些计时信息来使您更接近解决方案。此示例使用区块大小为 1 的映射函数,以便辅助角色一次获取新工作项 1。当每个工作项有大量工作线程处理时,这很有效。
import multiprocessing as mp
import time
def process_file(file):
try:
start = time.time()
fileno = re.search('(.*?).', file).group(1)
print("check " + fileno)
if not os.path.exists(outfolder+'/'+fileno+'.csv'):
print("Processing " + filename)
#run a bunch of stuff
with open(outfolder+'/'+fileno+'.csv', 'w', newline='') as myfile:
wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
wr.writerow(a bunch of stuff)
return "Success: {}, {} seconds".format(file, time.time()-start)
except Exception as e:
import traceback
return "Error: {}n{}n{} seconds".format(file, traceback.format_exc(), time.time()-start)
if __name__ == "__main__":
files_to_process = [os.path.join(dirpath, filename)
for dirpath, dirnames, filenames in os.walk(rawdata)
for filename in filenames]
with mp.Pool(4) as pool:
start = time.time()
for result in pool.imap_unordered(process_file, files_to_process, chunksize=1):
print("{}: {}".format(time.time()-start, result)
print('n job done!!')