从stackoverflow上的几个帖子中,我创建了这段代码。
场景
我想要一个多访问队列,其中有几个工作人员"监听"
在键盘中断的情况下,主进程不应该再将新项目放入队列中,并且在sentinel对象的帮助下,工作程序应该优雅地停止。
问题
我使用的当前版本的问题
signal.signal(signal.SIGINT, signal.SIG_IGN)
忽略Ctrl+C是指它也被主进程忽略。
有什么想法吗?我需要使用多处理工作池吗?一些例子表明我可能不得不这样做。那么我还能使用队列吗?
from multiprocessing import Pool, Process,Queue
import time
import signal
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process
class Worker(Process):
def __init__(self, queue,ident):
super(Worker, self).__init__()
# Ignore Signals
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.queue= queue
self.idstr= str(ident)
print "Ident" + self.idstr
def run(self):
print 'Worker started'
# do some initialization here
print 'Computing things!'
for data in iter( self.queue.get, None ):
print "#" + self.idstr + " : " + str(data)
time.sleep(5)
print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
print "Worker Done"
#### Main ####
request_queue = Queue(10)
for i in range(4):
Worker( request_queue,i ).start()
try:
for data in range(1000000):
request_queue.put( data )
#print "Queue Size: " + str(request_queue.qsize())
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put( None )
except KeyboardInterrupt:
print "Caught KeyboardInterrupt, terminating workers"
while request_queue.empty()==False:
request_queue.get()
request_queue.put( None )
基于您的解决方案(这很好),我添加了一层额外的保护层,以防主代码没有响应,用户取消两次:
global STOP
import os, signal
def signal_handler(sig, frame):
global STOP
if STOP:
signal.signal(signal.SIGINT, signal.SIG_IGN)
os.kill(os.getpid(), signal.SIGTERM)
STOP = True
signal.signal(signal.SIGINT, signal_handler)
我想我找到了一个解决方案。尽管如此,我还是不喜欢我从main得到1次SIGINT,从Worker得到4次,但也许我不得不接受这一点。
-
我为中断信号指定了一个信号处理程序。
-
在接收到第一个Sig INT之后,我忽略更多的Sig INT信号
-
我将停止标志切换为TRUE
-
我打破了队列插入循环
-
我调用停止功能,它清除队列并插入的停止哨兵
from multiprocessing import Pool, Process,Queue import time import signal # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process # Stop Flag for loop stop = False # Define SIGINT def signal_handler(sig, frame): print 'You pressed Ctrl+C!' global stop stop = True # Ignore more Ctrl+C signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal_handler) def stopSentinel(request_queue): print "CTRL Stop Queue and insert None" # Empty Existing Queue while request_queue.empty()==False: request_queue.get() # Put One None for each Worker for i in range(4): request_queue.put( None ) class Worker(Process): def __init__(self, queue,ident): super(Worker, self).__init__() self.queue= queue self.idstr= str(ident) print "Ident" + self.idstr def run(self): print 'Worker started' # do some initialization here print 'Computing things!' for data in iter( self.queue.get, None ): print "#" + self.idstr + " : " + str(data) time.sleep(5) print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize()) print "Worker Done" #### Main ##### request_queue = Queue(10) for i in range(4): Worker( request_queue, i ).start() #### Fill Queue with Data #### for data in range(1000000): request_queue.put( data ) #print "Queue Size: " + str(request_queue.qsize()) # Sentinel objects to allow clean shutdown: 1 per worker. # Check for Stop print "Check Breakout" if stop == True: print "Stop Break" break if stop == True: stopSentinel(request_queue) else: print "Normal Stop" for i in range(4): request_queue.put( None )