从以下代码中,我期望所得列表的长度与多处理为feed的项目之一相同:
import multiprocessing as mp
def worker(working_queue, output_queue):
while True:
if working_queue.empty() is True:
break #this is supposed to end the process.
else:
picked = working_queue.get()
if picked % 2 == 0:
output_queue.put(picked)
else:
working_queue.put(picked+1)
return
if __name__ == '__main__':
static_input = xrange(100)
working_q = mp.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
results_bank = []
while True:
if output_q.empty() is True:
break
else:
results_bank.append(output_q.get())
print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
results_bank.sort()
print results_bank
有人知道如何使此代码正确运行吗?
此代码永远不会停止:
每个工人只要没有空,就会从队列中获取一个物品:
picked = working_queue.get()
并为每个获得的新产品放置一个新的:
working_queue.put(picked+1)
因此,队列永远不会空,除非该过程之间的时序恰好是使队列在其中一个过程调用empty()
时为空。因为队列长度最初是100
,并且您拥有与cpu_count()
一样多的过程,如果它在任何现实的系统上停止,我会感到惊讶。
很好地执行代码,证明我是错误的,它确实在某个时候停止了,这实际上使我感到惊讶。使用一个过程执行代码似乎有一个错误,因为一段时间后,该过程冻结但不会返回。随着多个过程的结果,结果是变化的。
在循环迭代中添加短时间的睡眠期使代码的行为如上所述,并在上面解释。Queue.put
,Queue.get
和Queue.empty
之间似乎存在一些时间问题,尽管它们应该是线程安全的。删除empty
测试也给出了预期的结果(没有被卡在空排队中)。
找到了不同行为的原因。排在队列上的物体不会立即冲洗。因此,empty
可能会返回False
,尽管队列中有一些物品正在等待冲洗。
来自文档:
注意:将对象放在队列上时,对象被腌制并 背景线程后来将腌制的数据冲洗给基础 管道。这会带来一些令人惊讶的后果,但是 不应该造成任何实际困难 - 如果他们真的打扰 然后,您可以使用使用经理创建的队列。
将对象放在空排队上后,在队列的空()方法返回false和get_nowait()之前,可能会有无限延迟,而无需提高queue.empty。
如果多个进程是启用对象,则可能在另一端的端外接收对象。但是,同一过程中杰出的对象将始终处于彼此的预期顺序中。