我有两个python脚本,我希望它们能够相互通信。具体来说,如果后者需要,我希望scriptCommunication.py向scriptProcess.py发送一个数组。我使用了模块multiprocessing.Process
和multiprocessing.Pipe
使其工作。我的代码可以工作,但我想优雅地处理SIGINT和SIGTERM,我尝试了以下操作,但它没有优雅地退出:
Process.py
from multiprocessing import Process, Pipe
from Communication import arraySender
import time
import signal
class GracefulKiller:
kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True
def main():
parent_conn, child_conn = Pipe()
p = Process(target=arraySender, args=(child_conn,True))
p.start()
print(parent_conn.recv())
if __name__ == '__main__':
killer = GracefulKiller()
while not killer.kill_now:
main()
通讯.py
import numpy
from multiprocessing import Process, Pipe
def arraySender(child_conn, sendData):
if sendData:
child_conn.send(numpy.random.randint(0, high=10, size=15, dtype=int))
child_conn.close()
我做错了什么?
我强烈怀疑您在Windows下运行此程序,因为我认为您的代码应该在Linux下运行。这就是为什么总是用你所在的实际平台来标记你关于Python和多处理的问题是很重要的
问题似乎是由于除了主进程之外,您还在函数main
中创建了一个子进程,该子进程也在接收信号。解决方案通常是将类似signal.signal(signal.SIGINT, signal.SIG_IGN)
的调用添加到array_sender
工作函数中。但这有两个问题:
- 存在竞争条件:子进程可能在更改忽略信号之前接收到信号
- 无论如何,使用
multiprocess.Processing
时忽略信号的调用似乎不起作用(也许该类自己进行信号处理,覆盖这些调用(
解决方案是创建一个多处理池并初始化每个池进程,以便它们在提交任何任务之前忽略信号。使用池的另一个优点是,尽管在这种情况下,我们只需要池大小为1,因为一次运行的任务永远不会超过一个,但您只需要创建一次进程,然后就可以重用。
顺便说一句,通过将类属性kill_now
与执行self.kill_now = True
时创建的实例属性kill_now
混合,GracefulKiller
类中存在一些不一致性。因此,当主进程测试killer.kill_now
时,它将访问类属性,直到self.kill_now
设置为True时,它才会访问实例属性。
from multiprocessing import Pool, Pipe
import time
import signal
import numpy
class GracefulKiller:
def __init__(self):
self.kill_now = False # Instance attribute
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True
def init_pool_processes():
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
def arraySender(sendData):
if sendData:
return numpy.random.randint(0, high=10, size=15, dtype=int)
def main(pool):
result = pool.apply(arraySender, args=(True,))
print(result)
if __name__ == '__main__':
# Create pool with only 1 process:
pool = Pool(1, initializer=init_pool_processes)
killer = GracefulKiller()
while not killer.kill_now:
main(pool)
pool.close()
pool.join()
理想情况下GracefulKiller
应该是一个单例类,这样无论进程实例化GracefulKiller
多少次,对于要处理的每种类型的信号,都只能调用signal.signal
一次:
class Singleton(type):
def __init__(self, *args, **kwargs):
self.__instance = None
super().__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
if self.__instance is None:
self.__instance = super().__call__(*args, **kwargs)
return self.__instance
class GracefulKiller(metaclass=Singleton):
def __init__(self):
self.kill_now = False # Instance attribute
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True