管理和终止任何流程的稳健方式



我正在编写并行运行实验的代码。我无法控制实验的作用,它们可能会打开并使用subprocess.Popencheck_output来运行一个或多个额外的子进程。

我有两个条件:我想能够杀死超过一段时间的实验,我想杀死KeyboardInterrupt上的实验。

大多数终止流程的方法并不能确保所有子流程等都被终止。如果100个实验一个接一个地运行,但它们都产生了子进程,这些子进程在超时后仍然存在,并且实验被认为是被终止的,那么这显然是一个问题。

我现在处理这个问题的方式是包括将实验配置存储在数据库中的代码,从命令行生成加载和运行实验的代码,然后通过subprocess.Popen(cmd, shell=True, start_new_session=True)调用这些命令,并在超时时使用os.killpg杀死它们。

我的主要问题是:通过命令行调用这些实验感觉很麻烦,那么有没有办法直接通过multiprocessing.Process(target=fn)调用代码,并在超时时达到start_new_session=True+os.killpgKeyboardInterrupt相同的效果?

<file1>
def run_exp(config):
do work
return result
if __name__ == "__main__":
save_exp(run_exp(load_config(sys.args)))
<file2>
def monitor(queue):
active = set()  # active process ids
while True:
msg = queue.get()
if msg == "sentinel":
<loop over active ids and kill them with os.killpg>
else:
<add or remove id from active set>

def worker(args):
id, queue = args
command = f"python <file1> {id}"
with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
try:
queue.put(f"start {process.pid}")
process.communicate(timeout=timeout)
except TimeoutExpired:
os.killpg(process.pid, signal.SIGINT)  # send signal to the process group
process.communicate()
finally:
queue.put(f"done {process.pid}")
def main():
<save configs => c_ids>
queue = manager.Queue()
process = Process(target=monitor, args=(queue,))
process.start()
def clean_exit():
queue.put("sentinel")
<terminate pool and monitor process>
r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
atexit.register(clean_exit)
r.wait()
<terminate pool and monitor process>

我发布了一个代码框架,详细介绍了通过命令行启动进程并杀死它们的方法。我的方法的那个版本的另一个复杂之处是,当KeyboardInterrupt到达时,队列已经终止(因为缺少更好的词),并且无法与监视器进程通信(sentinel消息永远不会到达)。相反,我不得不将进程ID写入文件,并在主进程中读取回文件,以杀死仍在运行的进程。如果你知道解决这个队列问题的方法,我很想了解它

我认为问题是你存储了子进程pid来杀死它——你需要主机进程pid,而你使用了signal.SIGINT,我认为应该是signal.SIGTERM。试试这个,而不是这行:

os.killpg(process.pid, signal.SIGINT)

使用这行:

os.killpg(os.getpgid(process.pid), signal.SIGTERM) 

我想有一种方法可以避免这种情况,那就是使用Try-catch块
假设KeyboardInterrupt到达main(),那么您可以尝试以下操作:

def main():
try:
<save configs => c_ids>
queue = manager.Queue()
process = Process(target=monitor, args=(queue,))
process.start()
def clean_exit():
queue.put("sentinel")
<terminate pool and monitor process>
r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
atexit.register(clean_exit)
r.wait()
<terminate pool and monitor process>
except KeyboardInterrupt as e:
pass
#write the process you want to keep continuing. 

我想这会有帮助的。

最新更新