我正在努力熟悉Python中的多处理。表现没有达到我的预期;因此,我正在寻求如何提高工作效率的建议。
让我首先陈述一下我的目标:我基本上有一堆data
列表。这些列表中的每一个都可以独立地处理,例如通过一些伪例程do_work
。我在实际程序中的实现很慢(比在单个进程中连续执行相同操作慢(。我想知道这是否是由于多进程编程所涉及的酸洗/去酸洗开销。
因此,我尝试使用共享内存来实现一个版本。由于我分配工作的方式确保了没有两个进程试图同时写入同一块内存,所以我使用multiprocessing.RawArray
和RawValue
。事实证明,具有共享内存的版本甚至更慢。
我的代码如下:main_pass
和worker_pass
使用返回语句实现并行化,而main_shared
和worker_shared
使用共享内存。
import multiprocessing, time, timeit, numpy as np
data = None
def setup():
return np.random.randint(0,100, (1000,100000)).tolist(), list(range(1000))
def do_work(input):
output = []
for j in input:
if j % 3 == 0:
output.append(j)
return output
def main_pass():
global data
data, instances = setup()
with multiprocessing.Pool(4) as pool:
start = time.time()
new_blocks = pool.map(worker_pass, instances)
print("done", time.time() - start)
def worker_pass(i):
global data
return do_work(data[i])
def main_shared():
global data
data, instances = setup()
data = [(a := multiprocessing.RawArray('i', block), multiprocessing.RawValue('i', len(a))) for block in data]
with multiprocessing.Pool(4) as pool:
start = time.time()
pool.map(worker_shared, instances)
print("done", time.time() - start)
new_blocks = [list(a[:l.value]) for a, l in data]
print(new_blocks)
def worker_shared(i):
global data
array, length = data[i]
new_block = do_work(array[:length.value])
array[:len(new_block)] = new_block
length.value = len(new_block)
import timeit
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
print(timeit.timeit(lambda: main_pass(), number=1))
print(timeit.timeit(lambda: main_shared(), number=1))
我得到的时间:
done 7.257717132568359
10.633161254
done 7.889772891998291
38.037218965
因此,首先运行的版本(使用return(比将结果写入共享内存的版本快得多。
为什么会这样?
顺便说一下。,是否可以方便地测量在酸洗/去酸洗上花费的时间?
信息:我在MacOS10.15上使用python 3.9。
关于worker_pass
的返回输出通过酸洗完成的说法是正确的,但额外的开销显然不能补偿worker_shared
对";重新包装";CCD_ 11实例。当被迫对worker_pass
案例使用酸洗时,性能得到改善,就像在使用spawn
创建新流程的平台上一样。
在下面的spawn
演示中,我为随机数生成器设置了一个特定值的种子,这样我就可以为两次运行获得相同的生成值,并打印出所有返回随机数的总和,以确保两次运行都进行了等效的处理。很明显,如果您只对池的创建(其中非共享内存的开销是(和map
进行计时,那么使用共享内存阵列现在的性能会更好。但是,当您包括使用共享内存阵列所需的额外设置时间和后处理时间时,时间差异并没有那么大:
import multiprocessing, time, timeit, numpy as np
def setup():
np.random.seed(seed=1)
return np.random.randint(0,100, (1000,100000)).tolist(), list(range(1000))
def init_process_pool(the_data):
global data
data = the_data
def do_work(input):
output = []
for j in input:
if j % 3 == 0:
output.append(j)
return output
def main_pass():
data, instances = setup()
start = time.time()
with multiprocessing.Pool(4, initializer=init_process_pool, initargs=(data,)) as pool:
new_blocks = pool.map(worker_pass, instances)
print("done", time.time() - start)
print(sum(sum(new_block) for new_block in new_blocks))
def worker_pass(i):
global data
return do_work(data[i])
def main_shared():
data, instances = setup()
data = [(a := multiprocessing.RawArray('i', block), multiprocessing.RawValue('i', len(a))) for block in data]
start = time.time()
with multiprocessing.Pool(4, initializer=init_process_pool, initargs=(data,)) as pool:
pool.map(worker_shared, instances)
print("done", time.time() - start)
new_blocks = [list(a[:l.value]) for a, l in data]
#print(new_blocks)
print(sum(sum(new_block) for new_block in new_blocks))
def worker_shared(i):
global data
array, length = data[i]
new_block = do_work(array[:length.value])
array[:len(new_block)] = new_block
length.value = len(new_block)
import timeit
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
print(timeit.timeit(lambda: main_pass(), number=1))
print(timeit.timeit(lambda: main_shared(), number=1))
打印:
done 17.68915629386902
1682969169
20.2827687
done 3.9250364303588867
1682969169
23.2993996