所以,我最近一直在探索多处理和多线程主题。我发现,在某些情况下,使用多进程和多线程不会提高我的代码的速度。这里有一个例子:
import multiprocessing as mp
import time
dummyList = [1,2,3,4,5,6]
dummyList2 = ['a','b','c','d','e','f']
q_list = mp.Queue()
for i, j in zip(dummyList, dummyList2):
q_list.put(i)
q_list.put(j)
def f(queue):
q = queue.get()
print(q)
# if __name__ == "__main__":
# start = time.perf_counter()
# while not q_list.empty():
# p1 = mp.Process(target=f, args=[q_list])
# p2 = mp.Process(target=f, args=[q_list])
# p1.start()
# p2.start()
# p1.join()
# p2.join()
# finish = time.perf_counter()
# print(f'elaspse time = {finish - start} second(s)')
start = time.perf_counter()
while not q_list.empty():
f(q_list)
finish = time.perf_counter()
print(f'elaspse time = {finish - start} second(s)')
在上面的代码中,我尝试从多处理中获取一个项目。Queue类并打印它,直到Queue为空。我认为在这种情况下使用多处理会提高速度。令人惊讶的是,它不但没有增加速度,反而变慢了!也有很大的不同。如果没有多处理,只需要2ms,而使用多处理则需要690ms。
谁能给我解释一下,为什么会发生这种事?什么时候才是使用多处理/多线程的最佳时机。泰首先是创建新进程的开销,然后是读写多进程队列的开销,这是在向运行在同一进程中的函数传递参数时所没有的。这意味着你的"工人"。函数f
在本例中必须是足够"cpu密集"的;以证明我刚才提到的额外开销是合理的。
请参阅以下两个基准测试,其中函数全是CPU,没有I/O,因此可能是多处理的候选函数。它比较了单处理和多处理:在第一种情况下,我们有一个非cpu密集型函数,多处理会损害性能;在第二种情况下,我们有一个cpu密集型函数,多处理会提高性能:
import multiprocessing as mp
import time
QUARTER_SECOND_ITERATIONS = 5_000_000
def quarter_second():
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return sum
# non-multiprocessing version:
def compute_square(x, cpu_intensive):
""" Compute x ** 2 """
if cpu_intensive:
quarter_second()
return x ** 2
# multiprocessing version
def m_compute_square(input_q, output_q, cpu_intensive):
""" Compute x ** 2: """
while True:
x = input_q.get()
if x is None: # our signal to terminate
break
if cpu_intensive:
quarter_second()
output_q.put(x, x ** 2)
def main():
numbers = range(1, 101)
for intensive in (False, True):
t0 = time.perf_counter()
results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
t1 = time.perf_counter()
print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
t0 = time.perf_counter()
input_queue = mp.Queue()
output_queue = mp.Queue()
for x in numbers:
input_queue.put(x)
# Put two "no more input" indicators:
input_queue.put(None)
input_queue.put(None)
p1 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
p2 = mp.Process(target=m_compute_square, args=(input_queue, output_queue, intensive))
p1.start()
p2.start()
results = [output_queue.get() for _ in range(100)]
p1.join()
p2.join()
t1 = time.perf_counter()
print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')
# Required for Windows:
if __name__=='__main__':
main()
打印:
Non-multiprocessing time = 3.600000000000825e-05, intensive = False
Mutiprocessing time = 0.1501859, intensive = False
Non-multiprocessing time = 25.417471099999997, intensive = True
Mutiprocessing time = 14.596532500000002, intensive = True
使用多处理池
import multiprocessing as mp
from functools import partial
import time
QUARTER_SECOND_ITERATIONS = 5_000_000
def quarter_second():
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return sum
# non-multiprocessing version:
def compute_square(x, cpu_intensive):
""" Compute x ** 2 """
if cpu_intensive:
quarter_second()
return x ** 2
def main():
numbers = range(1, 101)
for intensive in (False, True):
t0 = time.perf_counter()
results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
t1 = time.perf_counter()
print(f'Non-multiprocessing time = {t1 - t0}, intensive = {intensive}')
t0 = time.perf_counter()
# create processing pool using all 8 processors:
with mp.Pool(8) as pool:
worker = partial(compute_square, cpu_intensive=intensive)
results = pool.map(worker, numbers)
t1 = time.perf_counter()
print(f'Mutiprocessing time = {t1 - t0}, intensive = {intensive}')
# Required for Windows:
if __name__=='__main__':
main()
打印:
Non-multiprocessing time = 3.9300000000006e-05, intensive = False
Mutiprocessing time = 0.22172129999999995, intensive = False
Non-multiprocessing time = 26.1021124, intensive = True
Mutiprocessing time = 7.3056439, intensive = True
使用多线程池
from multiprocessing.pool import ThreadPool
from functools import partial
import time
QUARTER_SECOND_ITERATIONS = 5_000_000
def quarter_second():
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return sum
# non-multithreading version:
def compute_square(x, cpu_intensive):
""" Compute x ** 2 """
if cpu_intensive:
quarter_second()
return x ** 2
def main():
numbers = range(1, 101)
for intensive in (False, True):
t0 = time.perf_counter()
results = [compute_square(x, cpu_intensive=intensive) for x in numbers]
t1 = time.perf_counter()
print(f'Non-multithreading time = {t1 - t0}, intensive = {intensive}')
t0 = time.perf_counter()
# create processing pool using all processors:
with ThreadPool(8) as pool:
worker = partial(compute_square, cpu_intensive=intensive)
results = pool.map(worker, numbers)
t1 = time.perf_counter()
print(f'Mutithreading time = {t1 - t0}, intensive = {intensive}')
# Required for Windows:
if __name__=='__main__':
main()
打印:
Non-multithreading time = 3.0000000000002247e-05, intensive = False
Mutithreading time = 0.03963000000000001, intensive = False
Non-multithreading time = 26.428487699999998, intensive = True
Mutithreading time = 29.0095318, intensive = True
因为"工人"函数是纯粹的CPU,多线程不能提高性能,实际上只会增加额外的开销。
Worker函数主要是"I/O">
在下面的基准测试中,compute_square
通过休眠模拟等待I/O完成。在这种情况下,它是多线程的候选者,因为它大部分时间都不执行实际的Python字节码,因此很少有对全局解释器锁的争用。
from multiprocessing.pool import ThreadPool
from functools import partial
import time
def compute_square(x):
""" Compute x ** 2 """
time.sleep(.25)
return x ** 2
def main():
numbers = range(1, 101)
t0 = time.perf_counter()
results = [compute_square(x) for x in numbers]
t1 = time.perf_counter()
print(f'Non-multithreading time = {t1 - t0}')
t0 = time.perf_counter()
# create pool using all processors:
with ThreadPool(8) as pool:
results = pool.map(compute_square, numbers)
t1 = time.perf_counter()
print(f'Mutithreading time = {t1 - t0}')
if __name__=='__main__':
main()
打印:
Non-multithreading time = 25.1188871
Mutithreading time = 4.039328099999999