Python 并行化代码花费的时间比应有的要长得多



编辑:我写这篇文章的目的是了解时间消耗的来源。我也欢迎其他建议,但我主要关心的是,我想了解的是为什么我的代码没有通过并行化加快速度?是什么原因导致并行化变慢?

我之前问过一个关于这个问题的问题,现在我意识到这不是一个好问题。对于可怜的帖子,我深表歉意。所以我再次要求花更多的精力来解决它。

我已经设法实现了并行化解决方案。但是,并行化代码比序列化版本慢得多。

编辑:下面的foo()函数相当简单,可以更简洁地表达,但函数的实际版本有点复杂。主要问题仍然是这样一个事实,即在数千个数组中,每个数组的长度为 ~70,000,比较的绝对数量是导致缓慢的原因。因此,并行化似乎是这里的最佳解决方案。当然,欢迎提出提高步骤效率的建议,我赞赏任何此类建议。

问题

考虑一个 numpy 数组列表。我需要对列表中的这些数组进行成对比较。我真正的问题有数千个长度为 ~70,000 的数组,但下面的玩具示例的数字要小得多(不过可以使用listLenarrayLen变量进行调整)

尝试

这里foo()是将使用的比较功能。如果您尝试使用arrayLenlistLen,您会发现无论您选择什么值,并行化函数do_calculations_mp总是比非并行化版本do_calculations_no_mp慢。从我读到的内容来看,multiprocesing.Process的开销比multiprocess.Pool少,所以它不应该花这么长时间,对吧?

我真的很感激这方面的任何帮助。

import numpy as np
from multiprocessing import Process
import itertools
import random
import sys
from datetime import datetime
def foo(arr1, arr2):
matches = 0
for i in range(len(arr1)):
if arr1[i] == arr2[i]:
matches += 1
return(matches)

def do_calculations_mp(aList):
flag_indices = []
processes = []
index_combns = list(itertools.combinations(range(len(aList)),2))
for i,j in index_combns:
p = Process(target = foo, args = (aList[i], aList[j]))
processes.append(p)
p.start()
for procs in processes:
procs.join()
return(flag_indices)

def do_calculations_no_mp(aList):
flag_indices = []
index_combns = list(itertools.combinations(range(len(aList)),2))
for i,j in index_combns:
numMatches = foo(aList[i], aList[j])
return(flag_indices)
if __name__ == '__main__':
listLen = 50
arrayLen = 300
# Creates a list of listLen arrays, where each array has length arrayLen
myList = [np.array([random.choice([0,1,2,5]) for i in range(arrayLen)]) for x in range(listLen)]
print("Processing No MP:             " + str(datetime.now()))
flagged = do_calculations_no_mp(myList)
print("Done processing No MP:        " + str(datetime.now()))
print("Processing MP:                " + str(datetime.now()))
flagged_mp = do_calculations_mp(myList)
print("Done processing MP:           " + str(datetime.now()))

几个一般要点:

  1. 您有有限数量的处理器。您的代码正在创建与您拥有的组合数量相等的进程数,这可能(大大)超过处理器的数量,这是毫无意义的。
  2. 不能使用return语句返回进程的结果。您需要另一种机制来执行此操作,例如将multiprocessing.Queue实例传递给写入结果的进程(还有其他方法)。
  3. 使用多处理池可解决问题 1. 和 2。
  4. 数组保留在共享内存中将减少多处理的开销。
  5. 与串行处理相比,CPU 密集型foo越多,多处理的性能就越好。这意味着要有很长的清单。

我已经修改了您的演示,并结合了这些想法,以证明多处理可以胜过串行处理。请注意,foo现在将myList作为全局变量(共享内存数组的列表)进行访问,因此现在只需传递列表索引。

from multiprocessing import Pool, Array
import itertools
import random
import sys
from time import time_ns

listLen = 6
arrayLen = 5_000_000
def foo(i, j):
arr1 = myList[i]
arr2 = myList[j]
matches = 0
for index in range(arrayLen):
if arr1[index] == arr2[index]:
matches += 1
return(matches)
def generate_args():
index_combns = list(itertools.combinations(range(listLen), 2))
for i, j in index_combns:
yield (i, j)
def init_pool_processes(aList):
global myList
myList = aList
def do_calculations_mp():
numMatches = 0
with Pool(initializer=init_pool_processes, initargs=(myList,)) as pool:
for matches in pool.starmap(foo, generate_args()):
numMatches += matches
pool.close()
pool.join()
return numMatches
def do_calculations_no_mp():
numMatches = 0
for i, j in generate_args():
numMatches += foo(i, j)
return numMatches
if __name__ == '__main__':
# Creates a list of listLen arrays, where each array has length arrayLen
myList = [Array('i', [random.choice([0,1,2,5]) for i in range(arrayLen)], lock=False) for x in range(listLen)]
start = time_ns()
numMatches = do_calculations_no_mp()
elapsed = (time_ns() - start) / 1_000_000_000
print(f"Done processing No MP, numMatches = {numMatches}, elapsed time = {elapsed}")
start = time_ns()
numMatches = do_calculations_mp()
elapsed = (time_ns() - start) / 1_000_000_000
print(f"Done processing MP, numMatches = {numMatches}, elapsed time = {elapsed}")

指纹:

Done processing No MP, numMatches = 18748321, elapsed time = 10.0353228
Done processing MP, numMatches = 18748321, elapsed time = 2.7033887

但是,将listLen定义为 70_000(大约是列表的长度),我们得到:

Done processing No MP, numMatches = 261721, elapsed time = 0.1350159
Done processing MP, numMatches = 261721, elapsed time = 0.2939995

因此,问题是,在列表变长之前,foo以当前的复杂性,CPU密集度不足以提高多处理的性能。当然,如果你的实际foo要复杂得多,那么......

随着组合的数量变得非常大,您可能需要考虑将方法multiprocessing.pool.imap_unordered与合适的块大小参数一起使用。 然后需要重新定义foo以采用单个元组参数。

如果性能很重要,让我们从原则开始

Python 解释器正在解释说明,请使用dis.dis( foo )在下面查看它们。

如果生成进程,Python 解释器是昂贵的(如你的第一个问题中所述)

众所周知,如果使用串行迭代器(read for-loops,基于生成器的任务农业等),Python Interpreter会非常糟糕(读取缓慢)

Python 解释器为大约 18M 个任务的规模分配"轻量级"任务,从来自6E3list项的数据组合而成,每个项大约有1E6np.array实例 - 正如您在(现已删除)第二个问题中所描述的那样,如果以上所有内容在multiprocessing中同时使用,几乎肯定会杀死它自己的能力(性能方面)。

def foo(arr1, arr2):
matches = 0
for i in range(len(arr1)):
if arr1[i] == arr2[i]:
matches += 1
return(matches)

被证明为工作单元的MCVE表示(任务)

>>> dis.dis( foo )
2           0 LOAD_CONST               1 (0)
3 STORE_FAST               2 (matches)
3           6 SETUP_LOOP              59 (to 68)
9 LOAD_GLOBAL              0 (range)
12 LOAD_GLOBAL              1 (len)
15 LOAD_FAST                0 (arr1)
18 CALL_FUNCTION            1
21 CALL_FUNCTION            1
24 GET_ITER            
>>   25 FOR_ITER                39 (to 67)
28 STORE_FAST               3 (i)
4          31 LOAD_FAST                0 (arr1)
34 LOAD_FAST                3 (i)
37 BINARY_SUBSCR       
38 LOAD_FAST                1 (arr2)
41 LOAD_FAST                3 (i)
44 BINARY_SUBSCR       
45 COMPARE_OP               2 (==)
48 POP_JUMP_IF_FALSE       25
5          51 LOAD_FAST                2 (matches)
54 LOAD_CONST               2 (1)
57 INPLACE_ADD         
58 STORE_FAST               2 (matches)
61 JUMP_ABSOLUTE           25
64 JUMP_ABSOLUTE           25
>>   67 POP_BLOCK           
6     >>   68 LOAD_FAST                2 (matches)
71 RETURN_VALUE        

使用伪指令的数量作为必须完成多少工作的简化度量(其中所有CPU时钟+ RAM分配成本+ RAM-I/O + O/S系统管理时间必须计算并计算实际应计成本),我们开始看到所有这些(不可避免的)附加成本的相对成本,与实际有用的任务(即最终有多少伪指令花在我们想要计算的东西上--有用-WORK--,对比了已经燃烧的间接费用和每个任务的附加成本,这是实现和运行

所需的)如果同时争取性能和效率(资源使用模式),这个分数是主要的。

对于附加开销成本占主导地位的情况,这些情况直接是将反模式堆积到最坏后果的罪过。

对于实例化 + 每个任务的附加开销成本不到有用工作的 0.01% 的情况,我们仍然可能导致相当不令人满意的低加速(请参阅昨天发布的模拟器和相关详细信息,并查看被破坏的加速线 - 即从今天的一天运行时开始,如果将所有这些附加成本保持在现在一样高,您将收到大约一天的运行时间)

对于有用工作的范围(实际foo(),但性能提升以剥离其大部分性能反模式)强烈主导的情况,即减少所有附加组件 + 每个任务的开销成本,我们仍然看到阿姆达尔定律上限 - 如此不言自明 - ">收益递减定律">(因为添加更多资源不再提高性能, 即使我们添加无限多的CPU内核和喜欢)

让我们测试上面(按原样)和以下(重构时)的速度foo()

from zmq import Stopwatch; aClk = Stopwatch()       # a [us]-resolution clock
import numpy as np
def test( aSize = 1E6 ):
TheSIZE = int( min( 1E7, aSize ) )              # fused RAM-footprint < GB
A = np.random.random_integers( 0, 9, TheSIZE )  # pre-allocate, fill
B = np.random.random_integers( 0, 9, TheSIZE )  # pre-allocate, fill
aClk.start() #----------------------------------# CRITICAL SECTION.start
_ = np.where( A==B, 1, 0 ).sum()                #    smart compute "count"  
t = aClk.stop() #-------------------------------# CRITICAL SECTION.end
MASK = ( "test( aSize = {0:>20d} ) "
+ "took {1: >20d} [us] "
+ " {2:} matches found in random filled A, B "
+ " ~ {3:} [us/loop]"
)
print MASK.format( TheSIZE, t, _, float( t ) / TheSIZE )

使用相同的方法来测量foo( A, B )在串行迭代器代码中持续多长时间

一个"裸"-foo( A, B )每个数组单元大约需要800 [ns](!!)(我们加上
我们为移动参数数据~ 20[MB]而支付的所有 SER/xfer/DES 成本,每个任务,这可能需要并花费数十万个 CPU 时钟,只是为了将数据移动到远程进程(worker),然后它开始一些伪指令dis.dis( foo )如上所示, 哎哟,好痛。。)

np.where()-code 每个阵列单元的速度提高了大约19 [ns]42 倍(25 [ns]如果我的本地主机上的所有 4 核都在后台加载,桌面系统实时运行 3 个电视流,因此可以使用的可用 CPU 内核更少numpy多核代码技巧)

>>> test( 1E34 )
took 239712 [us]  999665 matches found in random filled A, B  ~ 0.0239712 [us/loop]
took 220671 [us] 1000246 matches found in random filled A, B  ~ 0.0220671 [us/loop]
took 227004 [us] 1001805 matches found in random filled A, B  ~ 0.0227004 [us/loop]
took 210587 [us]  999267 matches found in random filled A, B  ~ 0.0210587 [us/loop]
took 195863 [us]  998218 matches found in random filled A, B  ~ 0.0195863 [us/loop]
took 241407 [us] 1000017 matches found in random filled A, B  ~ 0.0241407 [us/loop]
took 193243 [us] 1001084 matches found in random filled A, B  ~ 0.0193243 [us/loop]

从这种比较的角度来看,开始尝试正确地重构该过程 - 目标是从6E3长的~10[MB]-large-np.arrays列表中爬取所有~ 18M成对组合,以进行所有成对比较。

越快越好,对吧?

相关内容

  • 没有找到相关文章

最新更新