Python 多处理 .join() 死锁依赖于工作线程函数



我正在使用multiprocessingpython 库生成 4 个Process()对象来并行化 CPU 密集型任务。任务(这篇伟大文章的灵感和代码)是计算列表中每个整数的素因数。

main.py:

import random
import multiprocessing
import sys
num_inputs  = 4000
num_procs   = 4
proc_inputs = num_inputs/num_procs
input_list  = [int(1000*random.random()) for i in xrange(num_inputs)]
output_queue = multiprocessing.Queue()
procs        = []
for p_i in xrange(num_procs):
print "Process [%d]"%p_i
proc_list = input_list[proc_inputs * p_i:proc_inputs * (p_i + 1)]
print " - num inputs: [%d]"%len(proc_list)
# Using target=worker1 HANGS on join
p = multiprocessing.Process(target=worker1, args=(p_i, proc_list, output_queue))
# Using target=worker2 RETURNS with success
#p = multiprocessing.Process(target=worker2, args=(p_i, proc_list, output_queue))
procs.append(p)
p.start()
for p in jobs:
print "joining ", p, output_queue.qsize(), output_queue.full()
p.join()
print "joined  ", p, output_queue.qsize(), output_queue.full()
print "Processing complete."
ret_vals = []
while output_queue.empty() == False:
ret_vals.append(output_queue.get())
print len(ret_vals)
print sys.getsizeof(ret_vals)

观察:

  • 如果每个进程的目标是函数worker1,对于大于4000个元素的输入列表,主线程卡在.join()上,等待生成的进程终止并且永远不会返回。
  • 如果每个进程的目标是函数worker2,对于相同的输入列表,代码工作正常,主线程返回。

这让我非常困惑,因为worker1worker2之间的唯一区别(见下文)是前者在Queue中插入单个列表,而后者为每个进程插入单个列表列表。

为什么使用worker1而不使用目标worker2出现死锁? 不应该两者都(或两者都不)超过多处理队列最大大小限制 32767 吗?


工人 1 与工人 2:

def worker1(proc_num, proc_list, output_queue):
'''worker function which deadlocks'''  
for num in proc_list:
output_queue.put(factorize_naive(num))
def worker2(proc_num, proc_list, output_queue):
'''worker function that works'''
workers_stuff = []
for num in proc_list:
workers_stuff.append(factorize_naive(num))
output_queue.put(workers_stuff)

关于SO有很多类似的问题,但我相信这个问题的核心与所有这些问题明显不同。

相关链接:

  • https://sopython.com/canon/82/programs-using-multiprocessing-hang-deadlock-and-never-complete/
  • Python 多处理问题
  • Python 多处理 - 大型队列的进程在加入时挂起
  • Process.join() 和队列不适用于大数字
  • Python 3 在队列为空之前调用 join 时的多处理队列死锁
  • 使用多处理模块的脚本不会终止
  • 为什么进行多处理。Process.join() 挂起?
  • 何时在进程上调用 .join()?
  • Python 多处理模块的 .join() 方法到底在做什么?

文档对此发出警告:

警告:如上所述,如果子进程已将项目放入队列(并且未使用 JoinableQueue.cancel_join_thread),则在所有缓冲项目刷新到管道之前,该进程不会终止。

这意味着,如果您尝试加入该进程,除非您确定已放入队列中的所有项目都已使用,否则可能会遇到死锁。同样,如果子进程是非守护进程,则父进程在尝试加入其所有非守护进程子进程时可能会在退出时挂起。

虽然Queue似乎是无限的,但在后台,排队的项目会缓冲在内存中,以避免进程间管道过载。 在刷新这些内存缓冲区之前,进程无法正常结束。 您的worker1()将比您的worker2()更多的项目放在队列,仅此而已。 请注意,在实现诉诸内存缓冲之前可以排队的项目数未定义:它可能因操作系统和 Python 版本而异。

正如文档所建议的那样,避免这种情况的正常方法是在尝试.join()进程之前将所有项目从队列.get()。 正如您所发现的,是否有必要这样做取决于每个工作进程在队列中放入的项目数,但未定义

相关内容

最新更新