如何使用joblib并行写入文件?JoinableQueue问题



我正试图将超过100k个文件的计算结果写入单个文件。处理一个文件大约需要1秒,并向输出文件写入一行。问题本身是"令人尴尬的平行",我只是在努力正确地写入文件(比如CSV(。以下是很久以前对我有用的东西(Python 3.4?(:

import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + 'n')
q.task_done()
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
q = JoinableQueue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None) 
p.join() 

今天(在Python 3.6+上(,它产生了以下异常:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""

如何使用joblib正确地写入单个文件?

事实证明,实现任务的一种方法是通过multiprocessing.Manager,如下所示:

import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + 'n')
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
m = Manager()
q = m.Queue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()

我们让Manager管理上下文,其余部分保持不变(除了使用普通的Queue代替JoinableQueue(。

如果有人知道更好/更干净的方式,我会很乐意接受它作为答案。

相关内容

  • 没有找到相关文章

最新更新