我正在尝试在Windows 10上设置多处理池。
基本上一些 cpu(在我的例子中为 12(应该从Qin
读取并将结果写入Qout
.在Qin
中写入'end'
时,该过程应停止。
由于某种原因,进程挂起。
我开发了一个简单的版本:
from multiprocessing import Pool, Queue, Event
import os,time
def worker( Qin, Qout, event):
time.sleep(5)
while True:
item = Qin.get()
if item == 'end':
event.set()
else:
Qout.put(item)
time.sleep(1)
def manager():
Qin,Qout,event= Queue(), Queue(), Event()
processes = os.cpu_count()
pool = Pool(processes=processes)
for _ in range(processes):
pool.apply_async(worker,args= (Qin,Qout,event,))
for i in range(100):
print(i)
Qin.put(i)
Qin.put('end')
pool.close()
event.wait()
pool.terminate()
return Qout
Qout = manager()
你需要了解异步编程在python中是如何正确工作的。当你调用apply_async你会得到未来对象。python 中的队列实现依赖于系统管道将数据从一个进程传输到另一个进程,并依靠一些信号量来保护此管道上的读取和写入。
from multiprocessing import Pool, Queue, Event
import os
import time
import multiprocessing
def worker( Qin, Qout, event):
print('worker')
time.sleep(1)
event.set()
def manager():
processes = multiprocessing.cpu_count()
m = multiprocessing.Manager()
Qin = m.Queue()
Qout = m.Queue()
event = m.Event()
pool = Pool(processes=processes)
result = pool.apply_async(worker, (Qin, Qout, event))
result.get()
pool.close()
event.wait()
return Qout
if __name__ == '__main__':
Qout = manager()
我认为您的代码挂起的原因是因为所有工作任务最终都会等待某些内容同时出现在带有item = Qin.get()
行的输入队列上,因为get()
"块"等待将某些内容放入队列中。避免这种情况的一种方法是改用非阻塞get_nowait()
方法。这样做需要代码处理它可能引发的任何Empty
异常,但它避免在该点有效地停止该进程中的任何进一步执行。
同样,为了使事情正常工作,您需要创建并使用一个multiprocessing.Manager
,该创建一个服务器进程,该进程保存Python对象并允许其他进程通过代理操作它们。请参阅文档的"进程之间的共享状态"部分的"服务器进程"部分。
此外,在 Windows 上使用multiprocessing
时,通过将主进程的代码放在if __name__ == '__main__':
语句中来确保有条件地执行主进程的代码非常重要。这是因为模块在该平台上的实现方式 - 否则每次启动另一个并发任务(这涉及由它们import
(时,代码将再次执行。
下面是您的代码,其中包含所需的修改,因此它使用multiprocessing.Manager
。注意 我更改了manager()
函数的名称,以避免与用于创建共享对象的multiprocessing.Manager
混淆。
import multiprocessing
from queue import Empty as QueueEmpty
import os
import time
END_MARKER = 'end'
def worker(id, Qin, Qout, event):
while True:
try:
item = Qin.get_nowait() # Non-blocking.
except QueueEmpty:
if event.is_set(): # Last item seen?
break
continue # Keep polling.
if item == END_MARKER: # Last item?
event.set()
break # Quit.
Qout.put('{} via worker {}'.format(item, id))
time.sleep(.25)
def pool_manager():
processes = os.cpu_count()
pool = multiprocessing.Pool(processes=processes)
manager = multiprocessing.Manager()
Qin, Qout, event = manager.Queue(), manager.Queue(), manager.Event()
for i in range(100):
Qin.put(i)
Qin.put(END_MARKER)
for id in range(processes):
pool.apply_async(worker, (id, Qin, Qout, event))
pool.close() # Done adding tasks.
pool.join() # Wait for all tasks to complete.
return Qout
if __name__ == '__main__':
print('Processing')
Qout = pool_manager()
print('Contents of Qout:')
while not Qout.empty():
item = Qout.get()
print(' ', item)
print('End of script')