Catch Keyboard Interrupt可停止Python多处理工作程序处理队列



从stackoverflow上的几个帖子中,我创建了这段代码。

场景

我想要一个多访问队列,其中有几个工作人员"监听"

在键盘中断的情况下,主进程不应该再将新项目放入队列中,并且在sentinel对象的帮助下,工作程序应该优雅地停止。

问题

我使用的当前版本的问题

signal.signal(signal.SIGINT, signal.SIG_IGN) 

忽略Ctrl+C是指它也被主进程忽略。

有什么想法吗?我需要使用多处理工作池吗?一些例子表明我可能不得不这样做。那么我还能使用队列吗?

from multiprocessing import Pool, Process,Queue
import time
import signal
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
# http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process

class Worker(Process):
    def __init__(self, queue,ident):
        super(Worker, self).__init__()
        # Ignore Signals
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        self.queue= queue
        self.idstr= str(ident)
        print "Ident" + self.idstr
    def run(self):
        print 'Worker started'
        # do some initialization here
        print 'Computing things!'
        for data in iter( self.queue.get, None ):
            print "#" + self.idstr + " : " + str(data)
            time.sleep(5)
            print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
        print "Worker Done"
#### Main ####
request_queue = Queue(10)
for i in range(4):
    Worker( request_queue,i ).start()
try:
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put( None ) 
except KeyboardInterrupt:
    print "Caught KeyboardInterrupt, terminating workers"
    while  request_queue.empty()==False:
         request_queue.get()
    request_queue.put( None )    

基于您的解决方案(这很好),我添加了一层额外的保护层,以防主代码没有响应,用户取消两次:

global STOP
import os, signal
def signal_handler(sig, frame):
    global STOP
    if STOP:
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        os.kill(os.getpid(), signal.SIGTERM)
    STOP = True
signal.signal(signal.SIGINT, signal_handler)

我想我找到了一个解决方案。尽管如此,我还是不喜欢我从main得到1次SIGINT,从Worker得到4次,但也许我不得不接受这一点。

  1. 我为中断信号指定了一个信号处理程序。

  2. 在接收到第一个Sig INT之后,我忽略更多的Sig INT信号

  3. 我将停止标志切换为TRUE

  4. 我打破了队列插入循环

  5. 我调用停止功能,它清除队列并插入的停止哨兵

    from multiprocessing import Pool, Process,Queue
    import time
    import signal
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Queue
    # http://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.Process
    # Stop Flag for loop
    stop = False
    # Define SIGINT
    def signal_handler(sig, frame):
        print 'You pressed Ctrl+C!'
        global stop
        stop = True
        # Ignore more Ctrl+C
        signal.signal(signal.SIGINT, signal.SIG_IGN) 
    signal.signal(signal.SIGINT, signal_handler)
    def stopSentinel(request_queue):
        print "CTRL Stop Queue and insert None"
        # Empty Existing Queue
        while  request_queue.empty()==False:
            request_queue.get()
        # Put One None for each Worker
        for i in range(4):
            request_queue.put( None ) 
    
    class Worker(Process):
        def __init__(self, queue,ident):
            super(Worker, self).__init__()
            self.queue= queue
            self.idstr= str(ident)
            print "Ident" + self.idstr
        def run(self):
            print 'Worker started'
            # do some initialization here
            print 'Computing things!'
            for data in iter( self.queue.get, None ):
                print "#" + self.idstr + " : " + str(data)
                time.sleep(5)
                print "#" + self.idstr + "Queue Size: " + str(self.queue.qsize())
            print "Worker Done"
    
    #### Main #####
    request_queue = Queue(10)
    for i in range(4):
        Worker( request_queue, i ).start()
    #### Fill Queue with Data ####
    for data in range(1000000):
        request_queue.put( data )
        #print "Queue Size: " + str(request_queue.qsize())
        # Sentinel objects to allow clean shutdown: 1 per worker.
        # Check for Stop
        print "Check Breakout"
        if stop == True:
            print "Stop Break"
            break
    if stop == True:
        stopSentinel(request_queue)
    else:       
        print "Normal Stop" 
        for i in range(4):
            request_queue.put( None ) 
    

最新更新