我遇到了一个多进程问题:
class PythonHelper(object):
@staticmethod
def run_in_parallel(*functions):
processes=list()
for function in functions:
process=Process(target=function)
process.start()
processes.append(process)
for process in processes:
process.join()
我使用上面的静态方法来同时运行几个函数(将它们组合在一个进程中)。一切都很好,直到我遇到需要在'subprocess'之一被终止时强制进程终止。
例如:from PythonHelper import PythonHelper as ph
from Recorder import Recorder
class Logger(object):
def run_recorder_proc(self):
rec=Recorder()
rec.record_video()
def run_printer_proc(self):
#hypothetical function: execution takes a long time
for i in range(9000000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_printer_proc,self.run_recorder_proc)
self.run_printer_proc和self.run_recorder_proc是我的子进程。当其中一个子进程完成时,如何"杀死"剩余子进程?
编辑:完整源代码:
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.values():
process.terminate()
for process in processes.values():
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(900000):
print("number: {}".format(i))
def run_numbers_2(self):
for i in range(100000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()
根据上面的例子,我想强制终止run_numbers_1当run_numbers_2完成
您可以通过稍微更改run_in_parallel()
来实现:
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.itervalues():
process.terminate()
for process in processes.itervalues():
process.join()
(更新)基于您的完整代码,这里是一个工作示例。它使用Event
对象,而不是容易出错的os.waitpid()
,其他进程在完成时设置:
from multiprocessing import Process, Event
class MyProcess(Process):
def __init__(self, event, *args, **kwargs):
self.event = event
Process.__init__(self, *args, **kwargs)
def run(self):
Process.run(self)
self.event.set()
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
event = Event()
processes=[]
for function in functions:
process=MyProcess(event, target=function)
process.start()
processes.append(process)
# wait for any process to complete
event.wait()
# one process completed
# terminate all child processes
for process in processes:
process.terminate()
for process in processes:
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(90000):
print("1 number: {}".format(i))
def run_numbers_2(self):
for i in range(10000):
print("2 number: {}".format(i))
def run_logger(self):
PythonHelper.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()