Python死锁使用线程.线程,多处理.Queue和multiprocessing.Pool



我一直在提炼和调试使用多处理时发生的随机死锁情况。在主线程和多处理池之间排队。它挂起时,多处理。池试图加入。

更具体地说,您有一个额外的线程作为多进程。队列消费者和多处理。池工作线程作为主要生产者,但是当您在启动池工作线程之前将一些消息添加到主线程中的队列中时,会发生死锁。请看下面的脚本:

import multiprocessing as mp
import threading as th
from typing import Optional

log_queue = mp.Queue()

def listner():
while True:
v: Optional[str] = log_queue.get()
if v is None:
break
print(v)

def worker(x: int):
# Waste some time
v: float = 3.141576 * x
for _ in range(10):
v *= 0.92
log_queue.put(f'{v}')

def worker_error(e: BaseException):
print(f'Error: {e}')
return None

def main():
lt = th.Thread(target=listner, name='listener',
args=())
lt.start()
# print('Launched listner thread')
# print('Place some messages in the queue.')
for i in range(500):
log_queue.put(f'|--------------------------------------------------|')
# print('Running workers')
with mp.Pool() as pool:
for i in range(50):
pool.apply_async(
worker,
args=(i,),
error_callback=worker_error,
)
pool.close()
pool.join()
# print('Telling listener to stop ...')
log_queue.put(None)
lt.join()

if __name__ == '__main__':
main()

我开始认为这与多处理有关。在全局作用域中定义的队列。这是由multiprocessing.Pool(有意)继承的。也许是多进程。池试图在队列上运行一些析构函数/清理例程,这会挂起它?

也许它与MP管道和队列部分中的警告有关。直到所有缓冲项都通过队列的管道发送,池才会加入。如果是这样的话,为什么消费者线程不清除队列,因为main被阻塞了?

猜测吗?

虽然我不能100%确定是什么原因导致死锁,因为我不能在任何操作系统上复制它,这可能与操作系统在进程被分叉时没有正确重置队列锁有关,而最好的方法是通过初始化器将队列传递给子进程,而不是依赖于fork复制全局作用域。

import multiprocessing as mp
import threading as th
from typing import Optional
log_queue: Optional[mp.Queue] = None

def listner(log_queue):
while True:
v: Optional[str] = log_queue.get()
if v is None:
break
print(v)

def worker(x: int):
# Waste some time
v: float = 3.141576 * x
for _ in range(10):
v *= 0.92
log_queue.put(f'{v}')

def worker_error(e: BaseException):
print(f'Error: {e}')
return None
def init_pool(log_queue_local):
global log_queue
log_queue = log_queue_local
def main():
log_queue = mp.Queue()
lt = th.Thread(target=listner, name='listener',
args=(log_queue,))
lt.start()
# print('Launched listner thread')
# print('Place some messages in the queue.')
for i in range(500):
log_queue.put(f'|--------------------------------------------------|')
# print('Running workers')
with mp.Pool(initializer=init_pool, initargs=(log_queue,)) as pool:
for i in range(50):
pool.apply_async(
worker,
args=(i,),
error_callback=worker_error,
)
pool.close()
pool.join()
# print('Telling listener to stop ...')
log_queue.put(None)
lt.join()

if __name__ == '__main__':
main()

似乎没有死锁发生在这段代码中,并且它是"正确的"。和";portable"如果你想让你的代码在Windows和macos上工作,这是向子进程发送队列的方法。