编辑:我写这篇文章的目的是了解时间消耗的来源。我也欢迎其他建议,但我主要关心的是,我想了解的是为什么我的代码没有通过并行化加快速度?是什么原因导致并行化变慢?
我之前问过一个关于这个问题的问题,现在我意识到这不是一个好问题。对于可怜的帖子,我深表歉意。所以我再次要求花更多的精力来解决它。
我已经设法实现了并行化解决方案。但是,并行化代码比序列化版本慢得多。
编辑:下面的foo()
函数相当简单,可以更简洁地表达,但函数的实际版本有点复杂。主要问题仍然是这样一个事实,即在数千个数组中,每个数组的长度为 ~70,000,比较的绝对数量是导致缓慢的原因。因此,并行化似乎是这里的最佳解决方案。当然,欢迎提出提高步骤效率的建议,我赞赏任何此类建议。
问题
考虑一个 numpy 数组列表。我需要对列表中的这些数组进行成对比较。我真正的问题有数千个长度为 ~70,000 的数组,但下面的玩具示例的数字要小得多(不过可以使用listLen
和arrayLen
变量进行调整)
尝试
这里foo()
是将使用的比较功能。如果您尝试使用arrayLen
和listLen
,您会发现无论您选择什么值,并行化函数do_calculations_mp
总是比非并行化版本do_calculations_no_mp
慢。从我读到的内容来看,multiprocesing.Process
的开销比multiprocess.Pool
少,所以它不应该花这么长时间,对吧?
我真的很感激这方面的任何帮助。
import numpy as np
from multiprocessing import Process
import itertools
import random
import sys
from datetime import datetime
def foo(arr1, arr2):
matches = 0
for i in range(len(arr1)):
if arr1[i] == arr2[i]:
matches += 1
return(matches)
def do_calculations_mp(aList):
flag_indices = []
processes = []
index_combns = list(itertools.combinations(range(len(aList)),2))
for i,j in index_combns:
p = Process(target = foo, args = (aList[i], aList[j]))
processes.append(p)
p.start()
for procs in processes:
procs.join()
return(flag_indices)
def do_calculations_no_mp(aList):
flag_indices = []
index_combns = list(itertools.combinations(range(len(aList)),2))
for i,j in index_combns:
numMatches = foo(aList[i], aList[j])
return(flag_indices)
if __name__ == '__main__':
listLen = 50
arrayLen = 300
# Creates a list of listLen arrays, where each array has length arrayLen
myList = [np.array([random.choice([0,1,2,5]) for i in range(arrayLen)]) for x in range(listLen)]
print("Processing No MP: " + str(datetime.now()))
flagged = do_calculations_no_mp(myList)
print("Done processing No MP: " + str(datetime.now()))
print("Processing MP: " + str(datetime.now()))
flagged_mp = do_calculations_mp(myList)
print("Done processing MP: " + str(datetime.now()))
几个一般要点:
- 您有有限数量的处理器。您的代码正在创建与您拥有的组合数量相等的进程数,这可能(大大)超过处理器的数量,这是毫无意义的。
- 不能使用
return
语句返回进程的结果。您需要另一种机制来执行此操作,例如将multiprocessing.Queue
实例传递给写入结果的进程(还有其他方法)。 - 使用多处理池可解决问题 1. 和 2。 将
- 数组保留在共享内存中将减少多处理的开销。
- 与串行处理相比,CPU 密集型
foo
越多,多处理的性能就越好。这意味着要有很长的清单。
我已经修改了您的演示,并结合了这些想法,以证明多处理可以胜过串行处理。请注意,foo
现在将myList
作为全局变量(共享内存数组的列表)进行访问,因此现在只需传递列表索引。
from multiprocessing import Pool, Array
import itertools
import random
import sys
from time import time_ns
listLen = 6
arrayLen = 5_000_000
def foo(i, j):
arr1 = myList[i]
arr2 = myList[j]
matches = 0
for index in range(arrayLen):
if arr1[index] == arr2[index]:
matches += 1
return(matches)
def generate_args():
index_combns = list(itertools.combinations(range(listLen), 2))
for i, j in index_combns:
yield (i, j)
def init_pool_processes(aList):
global myList
myList = aList
def do_calculations_mp():
numMatches = 0
with Pool(initializer=init_pool_processes, initargs=(myList,)) as pool:
for matches in pool.starmap(foo, generate_args()):
numMatches += matches
pool.close()
pool.join()
return numMatches
def do_calculations_no_mp():
numMatches = 0
for i, j in generate_args():
numMatches += foo(i, j)
return numMatches
if __name__ == '__main__':
# Creates a list of listLen arrays, where each array has length arrayLen
myList = [Array('i', [random.choice([0,1,2,5]) for i in range(arrayLen)], lock=False) for x in range(listLen)]
start = time_ns()
numMatches = do_calculations_no_mp()
elapsed = (time_ns() - start) / 1_000_000_000
print(f"Done processing No MP, numMatches = {numMatches}, elapsed time = {elapsed}")
start = time_ns()
numMatches = do_calculations_mp()
elapsed = (time_ns() - start) / 1_000_000_000
print(f"Done processing MP, numMatches = {numMatches}, elapsed time = {elapsed}")
指纹:
Done processing No MP, numMatches = 18748321, elapsed time = 10.0353228
Done processing MP, numMatches = 18748321, elapsed time = 2.7033887
但是,将listLen
定义为 70_000(大约是列表的长度),我们得到:
Done processing No MP, numMatches = 261721, elapsed time = 0.1350159
Done processing MP, numMatches = 261721, elapsed time = 0.2939995
因此,问题是,在列表变长之前,foo
以当前的复杂性,CPU密集度不足以提高多处理的性能。当然,如果你的实际foo
要复杂得多,那么......
随着组合的数量变得非常大,您可能需要考虑将方法multiprocessing.pool.imap_unordered
与合适的块大小参数一起使用。 然后需要重新定义foo
以采用单个元组参数。
如果性能很重要,让我们从原则开始
Python 解释器正在解释说明,请使用dis.dis( foo )
在下面查看它们。
如果生成进程,Python 解释器是昂贵的(如你的第一个问题中所述)
众所周知,如果使用串行迭代器(read for-loops,基于生成器的任务农业等),Python Interpreter会非常糟糕(读取缓慢)
Python 解释器为大约 18M 个任务的规模分配"轻量级"任务,从来自6E3
list
项的数据组合而成,每个项大约有1E6
长np.array
实例 - 正如您在(现已删除)第二个问题中所描述的那样,如果以上所有内容在multiprocessing
中同时使用,几乎肯定会杀死它自己的能力(性能方面)。
def foo(arr1, arr2):
matches = 0
for i in range(len(arr1)):
if arr1[i] == arr2[i]:
matches += 1
return(matches)
被证明为工作单元的MCVE表示(任务)
>>> dis.dis( foo )
2 0 LOAD_CONST 1 (0)
3 STORE_FAST 2 (matches)
3 6 SETUP_LOOP 59 (to 68)
9 LOAD_GLOBAL 0 (range)
12 LOAD_GLOBAL 1 (len)
15 LOAD_FAST 0 (arr1)
18 CALL_FUNCTION 1
21 CALL_FUNCTION 1
24 GET_ITER
>> 25 FOR_ITER 39 (to 67)
28 STORE_FAST 3 (i)
4 31 LOAD_FAST 0 (arr1)
34 LOAD_FAST 3 (i)
37 BINARY_SUBSCR
38 LOAD_FAST 1 (arr2)
41 LOAD_FAST 3 (i)
44 BINARY_SUBSCR
45 COMPARE_OP 2 (==)
48 POP_JUMP_IF_FALSE 25
5 51 LOAD_FAST 2 (matches)
54 LOAD_CONST 2 (1)
57 INPLACE_ADD
58 STORE_FAST 2 (matches)
61 JUMP_ABSOLUTE 25
64 JUMP_ABSOLUTE 25
>> 67 POP_BLOCK
6 >> 68 LOAD_FAST 2 (matches)
71 RETURN_VALUE
使用伪指令的数量作为必须完成多少工作的简化度量(其中所有CPU时钟+ RAM分配成本+ RAM-I/O + O/S系统管理时间必须计算并计算实际应计成本),我们开始看到所有这些(不可避免的)附加成本的相对成本,与实际有用的任务(即最终有多少伪指令花在我们想要计算的东西上--有用-WORK--,对比了已经燃烧的间接费用和每个任务的附加成本,这是实现和运行
所需的)如果同时争取性能和效率(资源使用模式),这个分数是主要的。
对于附加开销成本占主导地位的情况,这些情况直接是将反模式堆积到最坏后果的罪过。
对于实例化 + 每个任务的附加开销成本不到有用工作的 0.01% 的情况,我们仍然可能导致相当不令人满意的低加速(请参阅昨天发布的模拟器和相关详细信息,并查看被破坏的加速线 - 即从今天的一天运行时开始,如果将所有这些附加成本保持在现在一样高,您将收到大约一天的运行时间)
对于有用工作的范围(实际foo()
,但性能提升以剥离其大部分性能反模式)强烈主导的情况,即减少所有附加组件 + 每个任务的开销成本,我们仍然看到阿姆达尔定律上限 - 如此不言自明 - ">收益递减定律">(因为添加更多资源不再提高性能, 即使我们添加无限多的CPU内核和喜欢)
让我们测试上面(按原样)和以下(重构时)的速度foo()
:
from zmq import Stopwatch; aClk = Stopwatch() # a [us]-resolution clock
import numpy as np
def test( aSize = 1E6 ):
TheSIZE = int( min( 1E7, aSize ) ) # fused RAM-footprint < GB
A = np.random.random_integers( 0, 9, TheSIZE ) # pre-allocate, fill
B = np.random.random_integers( 0, 9, TheSIZE ) # pre-allocate, fill
aClk.start() #----------------------------------# CRITICAL SECTION.start
_ = np.where( A==B, 1, 0 ).sum() # smart compute "count"
t = aClk.stop() #-------------------------------# CRITICAL SECTION.end
MASK = ( "test( aSize = {0:>20d} ) "
+ "took {1: >20d} [us] "
+ " {2:} matches found in random filled A, B "
+ " ~ {3:} [us/loop]"
)
print MASK.format( TheSIZE, t, _, float( t ) / TheSIZE )
使用相同的方法来测量foo( A, B )
在串行迭代器代码中持续多长时间
一个"裸"-foo( A, B )
每个数组单元大约需要800 [ns]
(!!)(我们加上
我们为移动参数数据~ 20[MB]
而支付的所有 SER/xfer/DES 成本,每个任务,这可能需要并花费数十万个 CPU 时钟,只是为了将数据移动到远程进程(worker),然后它开始一些伪指令dis.dis( foo )
如上所示, 哎哟,好痛。。)
而np.where()
-code 每个阵列单元的速度提高了大约19 [ns]
42 倍(25 [ns]
如果我的本地主机上的所有 4 核都在后台加载,桌面系统实时运行 3 个电视流,因此可以使用的可用 CPU 内核更少numpy
多核代码技巧)
>>> test( 1E34 )
took 239712 [us] 999665 matches found in random filled A, B ~ 0.0239712 [us/loop]
took 220671 [us] 1000246 matches found in random filled A, B ~ 0.0220671 [us/loop]
took 227004 [us] 1001805 matches found in random filled A, B ~ 0.0227004 [us/loop]
took 210587 [us] 999267 matches found in random filled A, B ~ 0.0210587 [us/loop]
took 195863 [us] 998218 matches found in random filled A, B ~ 0.0195863 [us/loop]
took 241407 [us] 1000017 matches found in random filled A, B ~ 0.0241407 [us/loop]
took 193243 [us] 1001084 matches found in random filled A, B ~ 0.0193243 [us/loop]
从这种比较的角度来看,开始尝试正确地重构该过程 - 目标是从6E3
长的~10[MB]
-large-np.array
s列表中爬取所有~ 18M
成对组合,以进行所有成对比较。
越快越好,对吧?