Python Multiprocessing Queue和Pool比普通循环慢



我试图在Python程序中实现多处理,我需要运行一些CPU密集型代码。在我的测试代码中,多处理队列和多处理池都比没有多处理的普通循环慢。在我的代码的Pool部分,我可以看到CPU使用率达到了最大值。然而,它仍然比正常循环慢!我的代码有问题吗?

import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Pool
import random

def run_sims(iterations):
sim_list = []
for i in range(iterations):
sim_list.append(random.uniform(0,1))
print(iterations, "count", sum(sim_list)/len(sim_list))
return (sum(sim_list)/len(sim_list))
def worker(queue):
i=0
while not queue.empty():
task = queue.get()
run_sims(task)
i=i+1
if __name__ == '__main__':    
queue = Queue()
iterations_list = [30000000, 30000000, 30000000, 30000000, 30000000]
it_len = len(iterations_list)

## Queue ##
print("#STARTING QUEUE#")
start_t = time.perf_counter()
for i in range(it_len):
iterations = iterations_list[i]
queue.put(iterations) 
process = Process(target=worker, args=(queue, ))
process.start()
process.join() 
end_t = time.perf_counter()
print("Queue time: ", end_t - start_t)

## Pool ##
print("#STARTING POOL#")
start_t = time.perf_counter()
with Pool() as pool:
results = pool.imap_unordered(run_sims, iterations_list)
for res in results:
res
end_t = time.perf_counter()
print("Pool time: ", end_t - start_t)
## No Multiprocessing - Normal Loop
print("#STARTING NORMAL LOOP#")
start_t = time.perf_counter()
for i in iterations_list:
run_sims(i)
end_t = time.perf_counter()
print("Normal time: ", end_t - start_t)

我已经尝试了上面的代码,但是多处理部分比普通循环慢:

队列时间:59秒

Pool Time: 83 seconds

正常循环时间:55秒

我的期望是Queue和Pool将明显快于普通循环。

向队列代码中添加进程,使其执行与池大致相同。在我的机器上,队列和池明显快于顺序。我有4个核心和8个cpu。由于这是一个cpu限制任务,性能差异将根据可用cpu的数量和机器中正在进行的其他工作而有所不同。

此脚本将工作线程的数量保持在cpu计数以下。如果这些是网络绑定任务,则较大的池可能执行得更快。磁盘绑定任务可能无法从更大的池中获益。

import time
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Pool
from multiprocessing import cpu_count 
import random

def run_sims(iterations):
sim_list = []
for i in range(iterations):
sim_list.append(random.uniform(0,1))
print(iterations, "count", sum(sim_list)/len(sim_list))
return (sum(sim_list)/len(sim_list))
def worker(queue):
i=0
while not queue.empty():
task = queue.get()
run_sims(task)
i=i+1
if __name__ == '__main__':
iteration_count = 5
queue = Queue()
iterations_list = [30000000] * iteration_count
it_len = len(iterations_list)

# guess a parallel execution size. CPU bound, and we want some
# room for other processes.
pool_size = max(min(cpu_count()-2, len(iterations_list)), 2)
print("Pool size", pool_size)
## Queue ##
print("#STARTING QUEUE#")
start_t = time.perf_counter()
for iterations in iterations_list:
queue.put(iterations) 
processes = []
for i in range(pool_size):
processes.append(Process(target=worker, args=(queue, )))
processes[-1].start()
for process in processes:
process.join() 
end_t = time.perf_counter()
print("Queue time: ", end_t - start_t)
## Pool ##
print("#STARTING POOL#")
start_t = time.perf_counter()
with Pool(pool_size) as pool:
results = pool.imap_unordered(run_sims, iterations_list)
for res in results:
res
end_t = time.perf_counter()
print("Pool time: ", end_t - start_t)
## No Multiprocessing - Normal Loop
print("#STARTING NORMAL LOOP#")
start_t = time.perf_counter()
for i in iterations_list:
run_sims(i)
end_t = time.perf_counter()
print("Normal time: ", end_t - start_t)

最新更新