以下for循环是迭代模拟过程的一部分,是计算时间的主要瓶颈:
import numpy as np
class Simulation(object):
def __init__(self,n_int):
self.n_int = n_int
def loop(self):
for itr in range(self.n_int):
#some preceeding code which updates rows_list and diff with every itr
cols_red_list = []
rows_list = list(range(2500)) #row idx for diff where negative element is known to appear
diff = np.random.uniform(-1.323, 3.780, (2500, 300)) #np.random.uniform is just used as toy example
for row in rows_list:
col = next(idx for idx, val in enumerate(diff[row,:]) if val < 0)
cols_red_list.append(col)
# some subsequent code which uses the cols_red_list data
sim1 = Simulation(n_int=10)
sim1.loop()
因此,我试图通过使用多处理包将其并行化,以减少计算时间:
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
def crossings(row, diff):
return next(idx for idx, val in enumerate(diff[row,:]) if val < 0)
class Simulation(object):
def __init__(self,n_int):
self.n_int = n_int
def loop(self):
for itr in range(self.n_int):
#some preceeding code which updates rows_list and diff with every
rows_list = list(range(2500))
diff = np.random.uniform(-1, 1, (2500, 300))
if __name__ == '__main__':
num_of_workers = cpu_count()
print('number of CPUs : ', num_of_workers)
pool = Pool(num_of_workers)
cols_red_list = pool.map(partial(crossings,diff = diff), rows_list)
pool.close()
print(len(cols_red_list))
# some subsequent code which uses the cols_red_list data
sim1 = Simulation(n_int=10)
sim1.loop()
不幸的是,与顺序代码相比,并行化要慢得多。因此,我的问题是:在那个特定的例子中,我是否正确地使用了多处理包?有没有其他方法可以并行化上面提到的for循环?
免责声明:当您试图通过并行化来减少代码的运行时间时,这并不能严格回答您的问题,但它可能仍然是一个很好的学习机会。
作为黄金法则,在转向多处理以改进之前性能(执行时间),应首先优化单螺纹外壳。
您的
rows_list = list(range(2500))
生成数字0
到2499
(即range
)并将其存储在内存(list
)中,这需要时间来分配所需内存和进行实际写入。然后,通过从内存中读取这些可预测值(这也需要时间),以可预测的顺序,每个值只使用一次:
for row in rows_list:
这与loop
函数的运行时特别相关,因为它是重复执行的(for itr in range(n_int):
)。
相反,考虑只在需要的时候生成数字,而不需要中间存储(这在概念上消除了访问RAM的任何需要):
for row in range(2500):
其次,除了共享相同的问题(不必要的内存访问)之外,还有以下内容:
diff = np.random.uniform(-1, 1, (2500, 300))
# ...
col = next(idx for idx, val in enumerate(diff[row,:]) if val < 0)
在我看来,在数学(或逻辑)层面上是可以优化的。
您要做的是通过将随机变量(col
索引)定义为"我第一次在[-1;1]中遇到低于0的随机变量时"来获得随机变量。但请注意,如果在[-α;α]上具有均匀分布的随机变量为负,则与在{0,1}上具有随机变量(,即bool
)相同。
因此,您现在使用的是bool
s,而不是float
s,并且您甚至不必进行比较(val < 0
),因为您已经有了bool。这可能会使代码更快。使用与rows_list
相同的思想,您可以仅在需要时生成bool
;测试它,直到它是True
(或者False
,选择一个,这显然无关紧要)。通过这样做,您只生成所需数量的随机bool
,而不是更多也不是更少(顺便说一句,如果行中的300个元素都是负数,代码中会发生什么?;):
for _ in range(n_int):
cols_red_list = []
for row in range(2500):
col = next(i for i in itertools.count() if random.getrandbits(1))
cols_red_list.append(col)
或者,根据列表理解:
cols_red_list = [next(i for i in count() if getrandbits(1))
for _ in range(2500)]
我相信,通过适当的统计分析,你甚至可以将col
随机变量表示为[0;limit
]上的非均匀变量,这样你就可以更快地计算它
请先测试单线程实现的"优化"版本的性能。如果运行时仍然不可接受,那么您应该研究多线程。
multiprocessing
使用系统进程(而不是线程!)进行并行化,这需要昂贵的IPC(进程间通信)来共享数据。
这会让你陷入两个困境:
diff = np.random.uniform(-1, 1, (2500, 300))
创建了一个大矩阵,该矩阵的pickle/复制成本很高rows_list = list(range(2500))
创建了一个较小的列表,但这里也是如此
为了避免这种昂贵的IPC,你有一个半的选择:
- 如果在符合POSIX的系统上,请在模块级别初始化变量,这样每个进程都可以快速获得所需数据的脏拷贝。这是不可扩展的,因为它需要POSIX,奇怪的架构(你可能不想把所有东西都放在模块级别),并且不支持共享对数据的更改
- 使用共享内存。这只支持大多数基本数据类型,但
mp.Array
应该满足您的需求
第二个问题是设置池的成本很高,因为需要启动num_cpu
进程。与此开销相比,您的工作量小到可以忽略不计。一个好的做法是只创建一个池并重用它
下面是一个仅限POSIX的快速而肮脏的解决方案示例:
import numpy as np
from multiprocessing import Pool, cpu_count
from functools import partial
n_int = 10
rows_list = np.array(range(2500))
diff = np.random.uniform(-1, 1, (2500, 300))
def crossings(row, diff):
return next(idx for idx, val in enumerate(diff[row,:]) if val < 0)
def workload(_):
cols_red_list = [crossings(row, diff) for row in rows_list]
print(len(cols_red_list))
class Simulation(object):
def loop(self):
num_of_workers = cpu_count()
with Pool(num_of_workers) as pool:
pool.map(workload, range(10))
pool.close()
sim1 = Simulation()
sim1.loop()
对我(和我的两个核心)来说,这大约是顺序版本的两倍快。
使用共享内存更新:
import numpy as np
from multiprocessing import Pool, cpu_count, Array
from functools import partial
n_int = 10
ROW_COUNT = 2500
### WORKER
diff = None
result = None
def init_worker(*args):
global diff, result
(diff, result) = args
def crossings(i):
result[i] = next(idx for idx, val in enumerate(diff[i*300:(i+1)*300]) if val < 0)
### MAIN
class Simulation():
def loop(self):
num_of_workers = cpu_count()
diff = Array('d', range(ROW_COUNT*300), lock=False)
result = Array('i', ROW_COUNT, lock=False)
# Shared memory needs to be passed when workers are spawned
pool = Pool(num_of_workers, initializer=init_worker, initargs=(diff, result))
for i in range(n_int):
# SLOW, I assume you use a different source of values anyway.
diff[:] = np.random.uniform(-1, 1, ROW_COUNT*300)
pool.map(partial(crossings), range(ROW_COUNT))
print(len(result))
pool.close()
sim1 = Simulation()
sim1.loop()
几个注意事项:
- 共享内存需要在工作者创建时设置,所以它无论如何都是全局的
- 这仍然没有比顺序版本快,但这主要是因为random.uniform需要完全复制到共享内存中。我认为这只是测试的价值观,而实际上你无论如何都会用不同的方式来填充它
- 我只将索引传递给工作者,并使用它们来读取和写入共享内存中的值