我使用多处理来训练神经网络,其中一个进程构建批样本并将其放入队列,父进程从队列中读取并使用pytorch训练网络。
我注意到,使用多处理进行训练的总时间并不比使用单个进程短,当进一步研究时,我发现尽管从多进程中的队列中读取比在单个进程中构建队列更快(正如预期的那样(,训练过程(对于多处理和单处理都是相同的代码(在多处理中需要更长的时间。
我编了一个简单的脚本举例说明。参见以下脚本:
import multiprocessing as mp
import numpy as np
import time
n = 200
def get_sample():
local_loop = 400
# data
x = np.random.rand(n,n)
p = np.random.rand(n,n)
y = 0
for i in range(local_loop):
y += np.power(x, p)
return y
def new_process(q_data, total_loops):
for i in range(total_loops):
q_data.put(get_sample())
print('finish new process')
def main(multi_proc=False):
st = time.time()
total_loops = 100
local_loop = 2500
mt = 0
other_t = 0
st_multi = time.time()
if multi_proc:
q_data = mp.Queue()
new_proc = mp.Process(target=new_process,args=(q_data, total_loops))
new_proc.start()
mt += time.time() - st_multi
for i in range(total_loops):
st_multi = time.time()
if multi_proc:
y = q_data.get()
else:
y = get_sample()
mt += time.time() - st_multi
other_st = time.time()
for j in range(local_loop):
y += np.random.rand(n,n)
other_t += time.time() - other_st
st_multi = time.time()
if multi_proc:
assert q_data.empty()
new_proc.join()
mt += time.time() - st_multi
print('nmulti_proc', multi_proc)
print('multi_proc_time', mt)
print('other_time', other_t)
print(f'total time: {time.time()-st}')
if __name__ == '__main__':
main(multi_proc=False)
main(multi_proc=True)
当我运行它时,我得到的结果是:
multi_proc False
multi_proc_time 36.44150114059448
other_time 39.08155846595764
total time: 75.5232412815094
finish new process
multi_proc True
multi_proc_time 0.4313678741455078
other_time 40.54900646209717
total time: 40.980711460113525
当multi_process=True时(当它们应该相同时(,other_time
长1秒以上。这似乎在多个平台/多个实验中是一致的,在我的真实例子中,它比使用多处理的收益要长,这导致了一个大问题。
有什么线索吗?
您的结果正是我所期望的。但你的基准是真实的现实吗?
在多处理的情况下,有两个进程:
- 主要过程;批量样品";并检索由子进程生成的结果
get_sample
子进程,它获取由主进程创建的样本,并将结果放入队列以供主进程检索
两个进程都是并行运行的,但与子进程相比,上面描述的主进程非常琐碎,只需要很少的CPU处理。因此,通过并行运行两个进程所获得的任何收益都会被将样本和结果从一个地址空间移动到另一地址空间所需的额外开销所抵消。
但是,如果创建一个新的批处理样本不是那么简单呢?在下面改进的基准测试中,我通过调用spin_cycles
来确保在生成新样本时旋转一些CPU周期。为了清晰起见,我已经安排好了多处理基准测试和顺序处理基准测试的代码:
import multiprocessing as mp
import numpy as np
import time
n = 200
total_loops = 20
local_loops = 400
def spin_cycles():
# simulate real processing time:
x = 0
for i in range(10_000_000):
x += i * i
return x
########### Sequential Benchmark: #######################
def process_sequential(sample):
# data
x = np.random.rand(n,n)
p = np.random.rand(n,n)
y = 0
for i in range(local_loops):
y += np.power(x, p)
return y
def sequential_processing():
results = []
for sample in range(total_loops):
# simulate real processing time:
spin_cycles()
results.append(process_sequential(sample))
def main_sequential():
st = time.time()
results = sequential_processing()
et = time.time()
print('Sequential time:', et-st)
########## Multiprocessing Benchmark ################
def process_multi(in_q, out_q):
for _ in range(total_loops):
sample = in_q.get()
# data
x = np.random.rand(n,n)
p = np.random.rand(n,n)
y = 0
for i in range(local_loops):
y += np.power(x, p)
out_q.put(y)
def construct_batch_samples_multi(in_q):
for sample in range(total_loops):
# simulate real processing time:
spin_cycles()
in_q.put(sample)
def main_multi():
st = time.time()
in_q, out_q = mp.Queue(), mp.Queue()
p = mp.Process(target=process_multi, args=(in_q, out_q))
p.start()
construct_batch_samples_multi(in_q)
results = [out_q.get() for _ in range(total_loops)]
et = time.time()
p.join()
print('Multiprocessing time:', et-st)
########### Run Benchmarks #######################
if __name__ == '__main__':
main_multi()
main_sequential()
打印:
Multiprocessing time: 18.932964086532593
Sequential time: 28.5939993858337
更新
这是为了证明实际处理样本的时间对于多处理和顺序基准测试是相同的。具体来说,我测量的是在多处理情况下处理样本的总时间,不包括读取和写入队列的时间,以便与顺序处理情况进行更直接的比较。我还使用了time.process_time()
来测量实际的CPU时间,并删除了对spin_cycles
的调用,以免混淆问题。
import multiprocessing as mp
import numpy as np
import time
n = 200
total_loops = 20
local_loops = 400
########### Sequential Benchmark: #######################
def sequential_processing():
results = []
start_time = time.process_time()
for sample in range(total_loops):
# data
x = np.random.rand(n,n)
p = np.random.rand(n,n)
y = 0
for i in range(local_loops):
y += np.power(x, p)
results.append(y)
processing_time = time.process_time() - start_time
print("Sequential processing time:", processing_time)
return results
def main_sequential():
st = time.time()
results = sequential_processing()
et = time.time()
print('Total elapsed time:', et-st)
########## Multiprocessing Benchmark ################
def process_multi(q):
samples = [q.get() for _ in range(total_loops)]
results = []
t = time.process_time()
for sample in samples:
# data
x = np.random.rand(n,n)
p = np.random.rand(n,n)
y = 0
for i in range(local_loops):
y += np.power(x, p)
results.append(y)
processing_time = time.process_time() - t
print('Multiprocessing processing time:', processing_time)
return results
def construct_batch_samples_multi(q):
for sample in range(total_loops):
q.put(sample)
def main_multi():
st = time.time()
q = mp.Queue()
p = mp.Process(target=construct_batch_samples_multi, args=(q,))
p.start()
process_multi(q)
et = time.time()
p.join()
print('Total multiprocessing elapsed time:', et-st)
########### Run Benchmarks #######################
if __name__ == '__main__':
main_multi()
main_sequential()
打印:
Multiprocessing processing time: 10.796875
Total multiprocessing elapsed time: 11.060487270355225
Sequential processing time: 10.96875
Total elapsed time: 11.052014112472534
这两个处理时间基本上没有区别(在这次运行中,多处理处理稍微快一点(。