我正在使用python的多处理来分析一些大型文本。几天后,我试图弄清楚为什么我的代码挂起(即进程没有结束(,我能够使用以下简单代码重现问题:
import multiprocessing as mp
for y in range(65500, 65600):
print(y)
def func(output):
output.put("a"*y)
if __name__ == "__main__":
output = mp.Queue()
process = mp.Process(target = func, args = (output,))
process.start()
process.join()
如您所见,如果要放入队列中的项目变得太大,则进程就会挂起。 它不会冻结,如果我在output.put()
后编写更多代码,它就会运行,但这个过程永远不会停止。
当字符串达到 65500 个字符时,这种情况开始发生,具体取决于您的解释器,它可能会有所不同。
我知道mp.Queue
有一个maxsize
论点,但是通过一些搜索,我发现它是关于队列的项目数量的大小,而不是项目本身的大小。
有没有办法解决这个问题? 我需要在原始代码中放入队列中的数据非常非常大......
您的队列已满,没有消费者可以清空它。
从Queue.put
的定义:
如果可选参数块为 True(默认值(且超时为 None (默认值(,则在必要时进行阻止,直到有可用插槽可用。
假设生产者和使用者之间不存在死锁(并且假设您的原始代码确实有使用者,因为您的示例没有(,最终生产者应该被解锁并终止。检查您的消费者的代码(或将其添加到问题中,以便我们查看(
更新
这不是问题所在,因为队列尚未获得最大大小,因此放置应该会成功,直到内存不足。
这不是队列的行为。正如这张票证中所阐述的,这里的阻塞部分不是队列本身,而是底层管道。从链接的资源("[]"之间的插入是我的(:
队列的工作方式如下: - 当你调用 queue.put(data( 时,数据被添加到一个 deque,它可以永远增长和收缩 - 然后线程从 deque 中弹出元素,并发送它们,以便其他进程可以通过管道或 Unix 套接字(通过套接字对创建(接收它们。但是,这是重要的一点,管道和 unix 套接字的容量都是有限的(曾经是 4k - 页面大小 - 在较旧的 Linux 内核上用于管道,现在是 64k,对于 unix 套接字在 64k-120k 之间,具体取决于可调系统(。 - 当你做 queue.get(( 时,你只需在管道/套接字上做一个读取
[..] 当大小 [变得太大] 时,写入线程会阻塞写入系统调用。 而且由于在取消项目之前执行了联接 [注意:这是您的
process.join
],因此您只是死锁,因为联接等待发送线程完成,并且由于管道/套接字已满,写入无法完成! 如果在等待提交者进程之前取消项目排队,则一切正常。
更新 2
我理解。但是我实际上没有消费者(如果它是我想的那样(,我只会在进程完成将其放入队列时从队列中获取结果。
是的,这就是问题所在。multiprocessing.Queue
不是存储容器。您应该专门使用它在"生产者"(生成进入队列的数据的进程(和"消费者"("使用"该数据的进程(之间传递数据。如您现在所知,将数据留在那里是一个坏主意。
如果我甚至不能先把它放在那里,我怎么能从队列中获取项目?
put
和get
隐藏了将数据放在一起的问题,如果它填满了管道,所以你只需要在你的"主"进程中设置一个循环,将项目get
出队列,例如,将它们附加到列表中。该列表位于主进程的内存空间中,不会堵塞管道。