我正试图将超过100k个文件的计算结果写入单个文件。处理一个文件大约需要1秒,并向输出文件写入一行。问题本身是"令人尴尬的平行",我只是在努力正确地写入文件(比如CSV(。以下是很久以前对我有用的东西(Python 3.4?(:
import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + 'n')
q.task_done()
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
q = JoinableQueue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()
今天(在Python 3.6+上(,它产生了以下异常:
joblib.externals.loky.process_executor._RemoteTraceback:
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""
如何使用joblib正确地写入单个文件?
事实证明,实现任务的一种方法是通过multiprocessing.Manager
,如下所示:
import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed
def save_to_file(q):
with open('test.csv', 'w') as out:
while True:
val = q.get()
if val is None: break
out.write(val + 'n')
def process(x):
q.put(str(os.getpid()) + '-' + str(x**2))
if __name__ == '__main__':
m = Manager()
q = m.Queue()
p = Process(target=save_to_file, args=(q,))
p.start()
Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
q.put(None)
p.join()
我们让Manager
管理上下文,其余部分保持不变(除了使用普通的Queue
代替JoinableQueue
(。
如果有人知道更好/更干净的方式,我会很乐意接受它作为答案。