用Python中的多处理保存不同进程生成的文件最安全的方法是什么



我对使用多处理包完全陌生。我已经建立了一个基于代理的模型,并希望并行运行大量具有不同参数的模拟。我的模型采用一个xml文件,提取一些参数并运行模拟,然后生成两个panda数据帧并将其保存为pickle文件。我正在尝试使用多处理。Process((类,但这两个数据帧没有正确保存,相反,对于某些模拟,我只得到一个数据帧,而对于其他模拟,则没有数据帧。我在这类工作中使用的是正确的类吗?使用多处理模块将模拟结果写入磁盘最安全的方法是什么?我补充道,如果我用一个简单的循环顺序启动模拟,我会得到正确的输出。感谢的支持

我添加了一个不可复制的代码示例,因为我不可能共享由许多模块和xml文件组成的模型。

import time
import multiprocessing
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = MYMODEL()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents" + ".pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model" + ".pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
processes = []
for path, name in zip(paths, names):
p = multiprocessing.Process(target=run, args=(path, name))
processes.append(p)
p.start()
for process in processes:
process.join()

我可以详细阐述我的评论,但遗憾的是,看着你的代码,对你的模型一无所知,我看不出你提到的问题有明显的原因。

我在评论中提到,我会根据您的处理是I/O绑定还是CPU绑定来使用线程池或处理器池,以便更好地控制您创建的线程/进程的数量。虽然线程的创建开销较小,但Python解释器将在同一进程中执行,因此在执行Python字节码时没有并行性,因为必须首先获得全局解释器锁(GIL(。因此,处理器池通常被推荐用于CPU密集型作业。然而,当执行发生在用C语言实现的运行库中时,例如numpypandas的情况,Python解释器会释放GIL,并且您仍然可以与线程保持高度并行。但我不知道ProtonOC类实例进行了什么性质的处理。如果它显然与I/O相关,则有些。因此,现在我建议您首先尝试一个线程池,我为其任意设置了最大大小20(我凭空得出的数字(。这里的问题是,你正在对你的磁盘进行并发操作,我不知道过多的线程是否会减慢磁盘操作(你有一个固态驱动器,手臂移动不是问题吗?(

如果您在MAX_CONCURRENCY设置为1的情况下运行以下代码示例,那么它应该可以工作。当然,这不是你的最终目标。但它展示了设置并发性是多么容易。

import time
from concurrent.futures import ThreadPoolExecutor as Executor
from model import ProtonOC
import random
import os
import numpy as np
import sys
sys.setrecursionlimit(100000)
def load(dir):
result = list()
names = list()
for filename in os.scandir(dir):
if filename.path.endswith('.xml'):
result.append(filename.path)
names.append(filename.name[:-4])
return result, names
def run(xml, name):
model = ProtonOC()
model.override_xml(xml)
model.run()
new_dir = os.path.join("C:\", name)
os.mkdir(new_dir)
model.datacollector.get_agent_vars_dataframe().to_pickle(os.path.join(new_dir, "agents.pkl"))
model.datacollector.get_model_vars_dataframe().to_pickle(os.path.join(new_dir, "model.pkl"))
if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
executor.map(run, paths, names)

要使用进程池,请更改:

from concurrent.futures import ThreadPoolExecutor as Executor

至:

from concurrent.futures import ProcessPoolExecutor as Executor

然后,您可能希望更改MAX_CONCURRENCY。但是,由于作业仍然涉及大量I/O,并且在执行此I/O时会放弃处理器,因此您可能会从该值中受益,该值大于您拥有的CPU数量。

更新

使用ThreadPoolExecutor类的map方法的一个替代方案是使用submit。这使您有机会在个人工作提交的基础上处理任何异常:

if __name__ == '__main__':
paths, names = load("C:\") #The folder that contains xml files
MAX_CONCURRENCY = 20 # never more than this
N_WORKERS = min(len(paths), MAX_CONCURRENCY)
with Executor(max_workers=N_WORKERS) as executor:
futures = [executor.submit(run, path, name) for path, name in zip(paths, names)]
for future in futures:
try:
result = future.get() # return value from run, which is None
except Exception as e: # any exception run might have thrown
print(e) # handle this as you see fit

您应该知道,这会逐个提交作业,而map在和ProcessPoolExecutor一起使用时,允许您指定chunksize参数。如果要提交的池大小为N和M个作业,其中M远大于N,则将chunksize作业一次放置在池中每个进程的工作队列中比一次放置一个作业更有效,以减少所需的共享内存传输次数。但是,只要您使用的是线程池,这是不相关的。

相关内容

  • 没有找到相关文章

最新更新