Python多处理的输出队列提供的结果超过了预期



从以下代码中,我期望所得列表的长度与多处理为feed的项目之一相同:

import multiprocessing as mp
def worker(working_queue, output_queue):
    while True:
        if working_queue.empty() is True:
            break #this is supposed to end the process.
        else:
            picked = working_queue.get()
            if picked % 2 == 0: 
                output_queue.put(picked)
            else:
                working_queue.put(picked+1)
    return
if __name__ == '__main__':
    static_input = xrange(100)    
    working_q = mp.Queue()
    output_q = mp.Queue()
    for i in static_input:
        working_q.put(i)
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())]
    for proc in processes:
        proc.start()
    for proc in processes:
        proc.join()
    results_bank = []
    while True:
        if output_q.empty() is True:
            break
        else:
            results_bank.append(output_q.get())
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed.
    results_bank.sort()
    print results_bank

有人知道如何使此代码正确运行吗?

此代码永远不会停止:

每个工人只要没有空,就会从队列中获取一个物品:

picked = working_queue.get()

并为每个获得的新产品放置一个新的:

working_queue.put(picked+1)

因此,队列永远不会空,除非该过程之间的时序恰好是使队列在其中一个过程调用empty()时为空。因为队列长度最初是100,并且您拥有与cpu_count()一样多的过程,如果它在任何现实的系统上停止,我会感到惊讶。

很好地执行代码,证明我是错误的,它确实在某个时候停止了,这实际上使我感到惊讶。使用一个过程执行代码似乎有一个错误,因为一段时间后,该过程冻结但不会返回。随着多个过程的结果,结果是变化的。

在循环迭代中添加短时间的睡眠期使代码的行为如上所述,并在上面解释。Queue.putQueue.getQueue.empty之间似乎存在一些时间问题,尽管它们应该是线程安全的。删除empty测试也给出了预期的结果(没有被卡在空排队中)。

找到了不同行为的原因。排在队列上的物体不会立即冲洗。因此,empty可能会返回False,尽管队列中有一些物品正在等待冲洗。

来自文档:

注意:将对象放在队列上时,对象被腌制并 背景线程后来将腌制的数据冲洗给基础 管道。这会带来一些令人惊讶的后果,但是 不应该造成任何实际困难 - 如果他们真的打扰 然后,您可以使用使用经理创建的队列。

  1. 将对象放在空排队上后,在队列的空()方法返回false和get_nowait()之前,可能会有无限延迟,而无需提高queue.empty。

  2. 如果多个进程是启用对象,则可能在另一端的端外接收对象。但是,同一过程中杰出的对象将始终处于彼此的预期顺序中。

相关内容

最新更新