Python 多处理:写入同一个 Excel 文件



我是Python的新手,我正在尝试将五个不同进程的结果保存到一个excel文件中(每个进程写入不同的工作表)。我在这里阅读了不同的帖子,但仍然无法完成,因为我对 pool.map、队列和锁感到非常困惑,而且我不确定这里需要什么来完成这项任务。 这是我到目前为止的代码:

list_of_days = ["2017.03.20", "2017.03.21", "2017.03.22", "2017.03.23", "2017.03.24"]
results = pd.DataFrame()
if __name__ == '__main__':
global list_of_days
writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
nr_of_cores = multiprocessing.cpu_count()
l = multiprocessing.Lock()
pool = multiprocessing.Pool(processes=nr_of_cores, initializer=init, initargs=(l,))
pool.map(f, range(len(list_of_days)))
pool.close()
pool.join()
def init(l):
global lock
lock = l
def f(k):
global results
*** DO SOME STUFF HERE***
results = results[ *** finished pandas dataframe *** ]
lock.acquire()
results.to_excel(writer, sheet_name=list_of_days[k])
writer.save()
lock.release()

结果是只在 excel 中创建一张工作表(我假设它是最后完成的过程)。关于此代码的一些问题:

  • 如何避免定义全局变量?
  • 甚至可以传递数据帧吗?
  • 我应该将锁定移动到主锁吗?

非常感谢这里的一些输入,因为我认为掌握多处理是工具性的。谢谢

1)为什么在第二种方法中在多个地方实现time.sleep?

__main__中,time.sleep(0.1),给启动process一个启动的时间片。
f2(fq, q)中,为queue提供一个将所有缓冲数据刷新到管道的时间片,以及 如使用q.get_nowait()
w(q)中,仅用于测试模拟长时间运行的writer.to_excel(...), 我删除了这个。

2) pool.map 和 pool = [mp.进程( . )]?

使用pool.map不需要Queue,不需要传递参数,代码更短。worker_process必须立即返回result并终止。 只要完成所有iterationpool.map就会启动一个新过程。results必须在之后进行处理。

使用pool = [mp.Process( . )],开始nprocessesprocessqueue.Empty终止

你能想到一种情况,你更喜欢一种方法而不是另一种方法吗?

方法1:快速设置,序列化,只对结果感兴趣才能继续。
方法 2:如果要并行执行所有工作负载。


不能在进程中使用global writer
writer实例必须属于一个process

mp.Pool的用法,例如:

def f1(k):
# *** DO SOME STUFF HERE***
results = pd.DataFrame(df_)
return results
if __name__ == '__main__':
pool = mp.Pool()
results = pool.map(f1, range(len(list_of_days)))
writer = pd.ExcelWriter('../test/myfile.xlsx', engine='xlsxwriter')
for k, result in enumerate(results):
result.to_excel(writer, sheet_name=list_of_days[k])
writer.save()
pool.close()

这导致.to_excel(...)__main__过程中按顺序调用。


如果你想要并行.to_excel(...)你必须使用mp.Queue()
例如:

worker过程:

# mp.Queue exeptions have to load from
try:
# Python3
import queue
except:
# Python 2
import Queue as queue
def f2(fq, q):
while True:
try:
k = fq.get_nowait()
except queue.Empty:
exit(0)
# *** DO SOME STUFF HERE***
results = pd.DataFrame(df_)
q.put( (list_of_days[k], results) )
time.sleep(0.1)  

writer过程:

def w(q):
writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
while True:
try:
titel, result = q.get()
except ValueError:
writer.save()
exit(0)
result.to_excel(writer, sheet_name=titel)

__main__流程:

if __name__ == '__main__':
w_q = mp.Queue()
w_p = mp.Process(target=w, args=(w_q,))
w_p.start()
time.sleep(0.1)
f_q = mp.Queue()
for i in range(len(list_of_days)):
f_q.put(i)
pool = [mp.Process(target=f2, args=(f_q, w_q,)) for p in range(os.cpu_count())]
for p in pool:
p.start()
time.sleep(0.1)
for p in pool:
p.join()
w_q.put('STOP')
w_p.join()

用 Python 测试:3.4.2 - 熊猫:0.19.2 - xlsxwriter:0.9.6

相关内容

  • 没有找到相关文章

最新更新