我正在编写一个基于python 2.7中的deap包的遗传优化算法(目标是很快迁移到python 3(。由于这是一个相当繁重的过程,优化的某些部分使用多处理包进行处理。以下是我的程序概要:
- 配置读入并保存在
config
对象中 - 进行了一些额外的预计算,并将其保存在
config
对象中 - 优化开始(随机初始化群体,应用突变、交叉以找到更好的解决方案(,其中的一些部分(评估函数(在多处理中执行
- 结果已保存
对于求值函数,我们需要访问config
对象的某些部分(在第2阶段之后保持不变(。因此,我们使用全局(常量(变量对不同的核心进行访问
from deap import base
import multiprocessing
toolbox = base.Toolbox()
def evaluate(ind):
# compute evaluation using config object
return(obj1,obj2)
toolbox.register('evaluate',evaluate)
def init_pool_global_vars(self, _config):
global config
config = _config
...
# setting up multiprocessing
pool = multiprocessing.Pool(processes=72, initializer=self.init_pool_global_vars,
initargs=[config])
toolbox.register('map', pool.map_async)
...
while tic < max_time:
# creating new individuals
# computing in optimisation the objective function on the different individuals
jobs = toolbox.map(toolbox.evaluate, ind)
fits = jobs.get()
# keeping best individuals
我们基本上进行不同的迭代(对于循环来说很大(,直到达到最大时间。我注意到,如果我把配置对象做得更大(即,给它添加大属性,比如一个大的numpy数组(,即使代码仍然相同,它的运行速度也会慢得多(相同时间跨度的迭代次数更少(。因此,我想我会创建一个特定的config_multiprocessing
对象,它只包含多处理部分所需的属性,并将其作为全局变量传递,但当我在3个核心上运行它时,它比在大config
对象和72个核心上更慢,它稍微快一点,但不多。
在启动多处理循环之前,我应该做些什么来确保循环的速度不会受到配置对象或任何其他数据操作的影响?
在云中的Linux虚拟机上的Linux docker映像中运行。
joblib
包设计用于处理有大量numpy数组要分发给具有共享内存的工作线程的情况。如果您将共享内存中的数据视为"只读",就像您在场景中描述的那样,这一点尤其有用。您还可以创建可写共享内存,如文档中所述。
您的代码可能看起来像:
import os
import numpy as np
from joblib import Parallel, delayed
from joblib import dump, load
folder = './joblib_memmap'
try:
os.mkdir(folder)
except FileExistsError:
pass
def evaluate(ind, data):
# compute evaluation using shared memory data
return(obj1, obj2)
# just used to initialize memory mapped data
def init_memmap_data(original_data):
data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(original_data, data_filename_memmap)
shared_data = load(data_filename_memmap, mmap_mode='r')
return shared_data
...
# however you set up indices needs to be changed here
indexes = range(10)
# however you load your numpy data needs to be done here
shared_data = init_memmap_data(numpy_array_to_share)
# change n_jobs as appropriate
results = Parallel(n_jobs=2)(delayed(evaluate)(ind, shared_data) for ind in indexes)
# get index of the maximum as the "best" individual
best_fit_individual = indexes[results.argmax()]
此外,joblib
支持可能比基于进程的后端更快的threading
后端。使用joblib测试这两种情况将很容易。