我正在尝试使用多处理器来加快数据处理速度。我的数据由3000json文件组成,我的代码如下:
def analyse_page(file, arg1, arg2, arg3, arg4):
with open(file) as f:
data = json.load(f)
for i in range(data):
data[i] = treat_item(data[i], arg1, arg2, arg3, arg4)
with open(output_json, 'w') as f:
json.dump(f,data)
for file in files:
analyse_page(file, arg1, arg2, arg3, arg4)
print('done!')
因此,我们的想法是处理json中的项,然后输出一个修改后的json。我发现我的计算机在一个简单的for循环中使用了15%的Cpu功率,所以我决定使用多处理器,但我遇到了一个无法理解的问题。我已经尝试过Process和Pool,无论是分块还是完全,但是,每次它总是可以处理三分之一的文件,然后脚本就停止了,不会出错!
因此,我使用if os.path.exists(): continue
再次启动代码,以便忽略处理过的文件。即使是这样,它也会处理另外三分之一的文件并停止。所以当我再次启动它时,它会进行另三次,然后打印done!
analyse_page
函数每页大约需要3秒,因此,在多处理中长时间启动相同函数的正确方法是什么?
更新,我已经做了什么:
处理
processes = []
for file in files:
p = multiprocessing.Process(target=analyse_page, args=(file, arg1, arg2, arg3, arg4,))
processes.append(p)
p.start()
for process in processes:
process.join()
使用批进行处理
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
processes = []
numberOfThreads = 6 #Max is 8
For file in files:
p = multiprocessing.Process(target=analyse_page, args=(file, arg1, arg2, arg3, arg4,))
processes.append(p)
for i in chunks(processes,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
池
pool = multiprocessing.Pool(6)
For file in files:
pool.map(analyse_page, (file, arg1, arg2, arg3, arg4,))
pool.close()
Fo为方便处理多处理,您可以使用concurrent.forets模块。
Python文档:并发期货
在我解释每一个方面之前,有一个很棒且简单的视频教程,带有示例代码(易于改编(:
YouTube:多处理器教程
对于使用多个处理器或线程处理许多任务,我建议使用队列模块。
Python文档:队列
from queue import Queue
#Create Queue object
q = Queue()
#Put item to queue
q.put("my value")
#Get and process each item in queue and remove it
while not q.empty():
myValue = q.get()