如何在多处理完成之前存储所有输出



我想在python中运行多进程。这里有一个例子:

def myFunction(name,age):
output = paste(name,age)
return output
names = ["A","B","C"]
ages = ["1","2","3"]
with mp.Pool(processes=no_cpus) as pool:
results = pool.starmap(myFunction,zip(names,ages))
results_table = pd.concat(results)
results_table.to_csv(file,sep="t",index=False)

CCD_ 1在实际情况下需要非常长的时间。有时我不得不中断跑步,重新开始。然而,只有在完成所有pool.starmap时,results才会写入输出文件。如何在中间/缓存结果完成之前存储它?我不想将myFunction从return更改为.to_csv()

谢谢!

不要使用map,而是使用方法imap,该方法返回一个迭代器,当迭代时,当每个结果可用时(即my_function返回(,该迭代器会逐一给出每个结果。但是,仍然必须按顺序返回结果。如果您不关心订单,请使用imap_unordered

当每个数据帧被返回和迭代时,它被转换为CSV文件,并根据它是否是第一个被处理的结果而输出带有或不带有头的数据。

import pandas as pd
import multiprocessing as mp
def paste(name, age):
return pd.DataFrame([[name, age]], columns=['Name', 'Age'])
def myFunction(t):
name, age = t # unpack passed tuple
output = paste(name, age)
return output
# Required for Windows:
if __name__ == '__main__':
names = ["A","B","C"]
ages = ["1","2","3"]
no_cpus = min(len(names), mp.cpu_count())
csv_file = 'test.txt'
with mp.Pool(processes=no_cpus) as pool:
# Results from imap must be iterated
for index, result in enumerate(pool.imap(myFunction, zip(names,ages))):
if index == 0:
# First return value
header = True
open_flags = "w"
else:
header = False
open_flags = "a"
with open(csv_file, open_flags, newline='') as f:
result.to_csv(f, sep="t", index=False, header=header)

test.txt:的输出

Name    Age
A       1
B       2
C       3

相关内容

  • 没有找到相关文章

最新更新