多进程.进程未在空队列上退出



我有大量的对象需要迭代,我认为多处理会大大加快工作速度。然而,一旦我增加了核心数量,这个简单的例子似乎就挂起了。

它挂在p.join()行,如果我终止并检查,q_in.empty()将返回True,并且输出队列具有适当数量的项。

是什么导致它挂起的?

from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while not q_in.empty():
# Simple code standing in for more complex operation
q_out.put(str(w) + '_' + str(q_in.get()))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5

您有多个进程从队列中提取。队列可能有数据,但当您开始获取数据时,另一个进程已经消耗掉了数据。多处理。Queue.empty说由于多线程/多处理语义,这是不可靠的

另一种方法是在队列的末尾放置一个进程结束哨兵,每个进程一个。当进程看到哨兵时,它退出。在您的情况下,None是一个不错的选择。

from multiprocessing import Process, Queue
import time
def worker_func(q_in, q_out, w):
time.sleep(1)
while True:
msg = q_in.get()
if msg is None:
break
q_out.put(str(w) + '_' + str(msg))
def setup_func(x):
q_in = Queue()
for w in range(x):
q_in.put(w)
q_out = Queue()
return((q_in, q_out))
def test_func(num_cores, q_in, q_out):
processes = []
for w in range(num_cores):
q_in.put(None)
p = Process(target=worker_func, args=(q_in, q_out, w))
processes.append(p)
p.start()
for p in processes:
p.join()
output_ls = []
while not q_out.empty():
output_ls.append(q_out.get())
return(output_ls)
q_in, q_out = setup_func(1000)
test_func(1, q_in, q_out) # This returns without issue for num_cores = 1 or 2
q_in, q_out = setup_func(1000)
test_func(5, q_in, q_out) # This hangs for num_cores = 5

最新更新