避免由于多处理的队列溢出而导致的死锁.可加入队列



假设我们有一个multiprocessing.Pool,其中工作线程共享一个multiprocessing.JoinableQueue,将工作项排入队列并可能排队更多工作:

def worker_main(queue):
    while True:
        work = queue.get()
        for new_work in process(work):
            queue.put(new_work)
        queue.task_done()

当队列填满时,queue.put()将阻塞。只要至少有一个进程使用 queue.get() 从队列中读取,它就会释放队列中的空间来解锁写入器。但是所有进程都可能同时阻塞queue.put()

有没有办法避免像这样被卡住?

根据process(work)创建更多项目的频率,在无限最大大小的队列旁边可能没有解决方案。

简而言之,队列必须足够大,以容纳可以随时拥有的整个工作项积压工作。


由于队列是使用信号量实现的,因此可能确实存在 SEM_VALUE_MAX 的硬大小限制,在 MacOS 中为 32767。因此,您需要对该实现进行子类化或使用put(block=False)并处理queue.Full(例如,将多余的项目放在其他地方(,如果这还不够的话。

或者,查看 Python 分布式工作项队列的第三方实现之一。

最新更新