i有一个Population
类,该类由Individual
类的实例组成。一个人可以使用函数change_val
更改个体的属性;在我的实际代码中,此更改可能需要很长时间,并且个人之间的处理时间可能会大不相同。一个人的变化独立于其他人,因此我想使用多处理来加快在人群中更新所有个人的过程(与仅使用简单的for-loop相比(。
那是我的玩具系统代码的骨架:
import numpy as np
import multiprocessing as mp
class Population(object):
def __init__(self, pool_proc):
self.individuals = []
self.pool_proc = pool_proc
def add_individual(self, individual):
self.individuals.append(individual)
def change_individuals_loop(self):
# in a loop, it works fine
for indi in self.individuals:
indi.change_val()
def change_individuals_multi(self):
# this does -of course - not work as change_val is not known. How would it be done correctly?
self.pool_proc.apply_async(change_val, self.individuals)
def print_pop(self):
for indi in self.individuals:
print "value: {}, exponent: {}".format(indi.val, indi.exponent)
class Individual(object):
def __init__(self, some_val, exponent):
self.val = some_val
self.exponent = exponent
def change_val(self):
self.val = self.val ** self.exponent
if __name__ == '__main__':
# just for reproducibility purposes
np.random.seed(1)
my_pool = mp.Pool(processes=5)
my_pop = Population(my_pool)
for indi in range(1, 6):
my_pop.add_individual(Individual(indi, np.random.choice(5)))
print "initially:"
my_pop.print_pop()
my_pop.change_individuals_loop()
print "nfirst iteration:"
my_pop.print_pop()
我的问题是我将如何重写函数change_individuals_multi
,以便它给我与change_individuals_loop
相同的输出。
问题是行
self.pool_proc.apply_async(change_val, self.individuals)
确实 - 当然 - 由于功能change_val
是未知的,因此无法使用。我将如何修改此行或代码结构才能使其正常工作?如果比apply_async
更适合这些目的,那么在这方面的建议非常受欢迎。
使其成为一个函数:
self.pool_proc.apply_async(lambda individual: individual.change_val(), self.individuals)
要获得值,您需要返回某些内容,并处理返回值。有很多方法可以这样做,一个例子:
from multiprocessing import Pool
def workerfn((ndx, individual)):
individual.change_val()
return ndx, individual.val
...
pool = Pool(...)
for ndx, val in pool.imap_unordered(workerfn, enumerate(self.individuals)):
self.individuals[ndx].val = val
更新:为什么对100 000个人的多手术速度较慢?
大多数尝试过多线程/处理的人很早就遇到了这一点。原因很简单:开销。在单线程版本中,您可以执行函数调用 dextonation 分配,而在多线程版本中,您可以在单线螺纹版本 启动过程池中完成所有操作100K结果的序列化 相同 概要通信的避免化(结果( 将结果分配给对象...我并不感到惊讶的是它会慢慢;-)
使多个流程在常规的多核设置(我不是在谈论100个内核(更快地工作,您需要大量的工作/数据,分为您分配给每个过程的较大块。例如。划分100 000/核心数,并发送一个个人列表,而不是一对一发送。
当您将对象发送到另一个过程时,Python需要序列化并进行序列化,因为另一个过程运行了一个完全独立的Python解释器。与发送基本类型(如元组/列表/等(相比,这需要大量时间。尝试发送计算而不是单个对象的参数。
最后,要完成的工作数量需要花费的时间比进行校正呼叫 返回所需的时间。