Python multiprocessing -捕获信号以重启子进程或关闭父进程



我正在使用多进程库来生成两个子进程。我想确保只要父进程还活着,如果子进程死亡(接收SIGKILL或SIGTERM),它们就会自动重新启动。另一方面,如果父进程接收到SIGTERM/SIGINT,我希望它终止所有子进程,然后退出。

我是这样处理这个问题的:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN
from functools import partial
import multiprocessing
import setproctitle
class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()
        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)
    def run(self):
        setproctitle.setproctitle("helloProcess")
        while True:
            print "Hello World"
            time.sleep(1)
class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1
        # ignore, let parent handle it
        signal(SIGTERM, SIG_IGN)
    def run(self):
        setproctitle.setproctitle("counterProcess")
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def signal_handler(helloProcess, counterProcess, signum, frame):
    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess
    if signum == 17:
        print "helloProcess: ", helloProcess.is_alive()
        if not helloProcess.is_alive():
            print "Restarting helloProcess"
            helloProcess = HelloWorld()
            helloProcess.start()
        print "counterProcess: ", counterProcess.is_alive()
        if not counterProcess.is_alive():
            print "Restarting counterProcess"
            counterProcess = Counter()
            counterProcess.start()
    else:
        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()
        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()
        sys.exit(0)

if __name__ == '__main__':
    helloProcess = HelloWorld()
    helloProcess.start()
    counterProcess = Counter()
    counterProcess.start()
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    multiprocessing.active_children()

如果我向counterProcess发送SIGKILL,它将正确重启。但是,向helloProcess发送SIGKILL也会重启counterProcess而不是helloProcess?

如果我向父进程发送SIGTERM,父进程将退出,但子进程成为孤儿并继续。我该如何纠正这种行为?

这段代码有几个问题,所以我将依次讨论它们。

如果我向counterProcess发送SIGKILL,它将正确重启。但是,向helloProcess发送SIGKILL也会重启counterProcess而不是helloProcess?

这种特殊的行为很可能是由于在主进程中缺乏阻塞调用,因为multiprocessing.active_children()并没有真正充当阻塞调用。我不能真正解释程序行为的确切原因,但在__main__函数中添加阻塞调用,例如

while True:
    time.sleep(1)

解决这个问题。

另一个非常严重的问题是你将对象传递给handler的方式:

helloProcess = HelloWorld()
...
partial(signal_handler, helloProcess, counterProcess)

是过时的,考虑到你在里面创建新的对象:

if not helloProcess.is_alive():
    print "Restarting helloProcess"
    helloProcess = HelloWorld()
    helloProcess.start()

注意两个对象对HelloWorld()对象使用不同的别名。__main__函数中的部分对象绑定别名,而回调函数中的对象绑定其局部作用域别名。因此,通过将新对象分配给局部作用域别名,您不会真正影响回调绑定到的对象(它仍然绑定到在__main__作用域中创建的对象)。

你可以通过在回调范围内用新对象重新绑定你的信号回调来修复它:

def signal_handler(...):
    ...
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    ...

然而,这会导致另一个陷阱,因为现在每个子进程都将从父进程继承回调,并在每次接收到信号时访问它。要解决这个问题,您可以在创建子进程之前将信号处理程序临时设置为default:

for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
    signal(signame, SIG_DFL)

最后,您可能希望在终止子进程之前压制来自子进程的任何信号,否则它们将再次触发回调:

signal(SIGCHLD, SIG_IGN)

请注意,您可能希望重新设计应用程序的体系结构,并利用multiprocessing提供的一些功能。

最终代码:

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_IGN, SIG_DFL
from functools import partial
import multiprocessing
#import setproctitle
class HelloWorld(multiprocessing.Process):
    def __init__(self):
        super(HelloWorld, self).__init__()
        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)
    def run(self):
        #setproctitle.setproctitle("helloProcess")
        while True:
            print "Hello World"
            time.sleep(1)
class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1
        # ignore, let parent handle it
        #signal(SIGTERM, SIG_IGN)
    def run(self):
        #setproctitle.setproctitle("counterProcess")
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def signal_handler(helloProcess, counterProcess, signum, frame):
    print multiprocessing.active_children()
    print "helloProcess: ", helloProcess
    print "counterProcess: ", counterProcess
    print "current_process: ", multiprocessing.current_process()
    if signum == 17:
        # Since each new child inherits current signal handler,
        # temporarily set it to default before spawning new child.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, SIG_DFL)
        print "helloProcess: ", helloProcess.is_alive()
        if not helloProcess.is_alive():
            print "Restarting helloProcess"
            helloProcess = HelloWorld()
            helloProcess.start()
        print "counterProcess: ", counterProcess.is_alive()
        if not counterProcess.is_alive():
            print "Restarting counterProcess"
            counterProcess = Counter()
            counterProcess.start()
        # After new children are spawned, revert to old signal handling policy.
        for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
            signal(signame, partial(signal_handler, helloProcess, counterProcess))

    else:
        # Ignore any signal that child communicates before quit   
        signal(SIGCHLD, SIG_IGN) 
        if helloProcess.is_alive():
            print "Stopping helloProcess"
            helloProcess.terminate()
        if counterProcess.is_alive():
            print "Stopping counterProcess"
            counterProcess.terminate()
        sys.exit(0)

if __name__ == '__main__':
    helloProcess = HelloWorld()
    helloProcess.start()
    counterProcess = Counter()
    counterProcess.start()
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, partial(signal_handler, helloProcess, counterProcess))
    while True:
        print multiprocessing.active_children()
        time.sleep(1)

要从signal.SIGCHLD处理程序中重新创建已死的子进程,母进程必须调用os.wait函数之一,因为Process.is_alive在这里不起作用。
虽然这是可能的,但它很复杂,因为signal.SIGCHLD是在其子状态发生变化时传递给母亲的,例如signal.SIGSTOP, signal.SIGCONT或任何其他终止信号都是由子接收的。因此,signal.SIGCHLD处理程序必须区分子进程的这些状态。当signal.SIGCHLD分娩时,仅仅是重建孩子可能会产生更多的孩子。

下面的代码使用os.waitpidos.WNOHANG使其不阻塞,并使用os.WUNTRACEDos.WCONTINUED来学习signal.SIGCHLD是来自signal.SIGSTOP还是signal.SIGCONT
os.waitpid不起作用,即如果Process实例中的任何一个是print ed,即在调用os.waitpid之前返回(0, 0)

import sys
import time
from signal import signal, pause, SIGINT, SIGTERM, SIGQUIT, SIGCHLD, SIG_DFL
import multiprocessing
import os
class HelloWorld(multiprocessing.Process):
    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)
class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1
    def run(self):
        # reset SIGTERM to default for Process.terminate to work
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1

def signal_handler(signum, _):
    global helloProcess, counterProcess
    if signum == SIGCHLD:
        pid, status = os.waitpid(-1, os.WNOHANG|os.WUNTRACED|os.WCONTINUED)
        if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
            return
        if os.WIFSIGNALED(status) or os.WIFEXITED(status):
            if helloProcess.pid == pid:
                print("Restarting helloProcess")
                helloProcess = HelloWorld()
                helloProcess.start()
            elif counterProcess.pid == pid:
                print("Restarting counterProcess")
                counterProcess = Counter()
                counterProcess.start()
    else:
        # mother shouldn't be notified when it terminates children
        signal(SIGCHLD, SIG_DFL)
        if helloProcess.is_alive():
            print("Stopping helloProcess")
            helloProcess.terminate()
        if counterProcess.is_alive():
            print("Stopping counterProcess")
            counterProcess.terminate()
        sys.exit(0)
if __name__ == '__main__':
    helloProcess = HelloWorld()
    helloProcess.start()
    counterProcess = Counter()
    counterProcess.start()
    for signame in [SIGINT, SIGTERM, SIGQUIT, SIGCHLD]:
        signal(signame, signal_handler)
    while True:
        pause()

下面的代码在不使用signal.SIGCHLD的情况下重新创建死亡的孩子。所以更简单。
创建了两个子进程后,母进程为SIGINT、SIGTERM、SIGQUIT设置了一个名为term_child的信号处理程序。term_child在调用时终止并加入每个子节点。

母进程继续检查子进程是否活着,并在必要时在while循环中重新创建它们。

因为每个子进程都继承母进程的信号处理程序,所以SIGINT处理程序应该重置为Process.terminate的默认值,以便CC_33工作

import sys
import time
from signal import signal, SIGINT, SIGTERM, SIGQUIT
import multiprocessing
class HelloWorld(multiprocessing.Process):    
    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print "Hello World"
            time.sleep(1)
class Counter(multiprocessing.Process):
    def __init__(self):
        super(Counter, self).__init__()
        self.counter = 1
    def run(self):
        signal(SIGTERM, SIG_DFL)
        while True:
            print self.counter
            time.sleep(1)
            self.counter += 1
def term_child(_, __):
    for child in children:
        child.terminate()
        child.join()
    sys.exit(0)
if __name__ == '__main__':
    children = [HelloWorld(), Counter()]
    for child in children:
        child.start()
    for signame in (SIGINT, SIGTERM, SIGQUIT):
        signal(signame, term_child)
    while True:
        for i, child in enumerate(children):
            if not child.is_alive():
                children[i] = type(child)()
                children[i].start()
        time.sleep(1)

最新更新