假设我们有一个multiprocessing.Pool
,其中工作线程共享一个multiprocessing.JoinableQueue
,将工作项排入队列并可能排队更多工作:
def worker_main(queue):
while True:
work = queue.get()
for new_work in process(work):
queue.put(new_work)
queue.task_done()
当队列填满时,queue.put()
将阻塞。只要至少有一个进程使用 queue.get()
从队列中读取,它就会释放队列中的空间来解锁写入器。但是所有进程都可能同时阻塞queue.put()
。
有没有办法避免像这样被卡住?
根据process(work)
创建更多项目的频率,在无限最大大小的队列旁边可能没有解决方案。
简而言之,队列必须足够大,以容纳可以随时拥有的整个工作项积压工作。
由于队列是使用信号量实现的,因此可能确实存在 SEM_VALUE_MAX
的硬大小限制,在 MacOS 中为 32767。因此,您需要对该实现进行子类化或使用put(block=False)
并处理queue.Full
(例如,将多余的项目放在其他地方(,如果这还不够的话。
或者,查看 Python 分布式工作项队列的第三方实现之一。