我正在处理以下问题:
我实现了一个虚拟的"Thing"类,它睡眠10秒并记录一条消息("o"(。该类在进程池的工作函数中实例化,并调用实现上述逻辑的"foo"方法。
我想要实现的是自定义信号处理:只要进程没有终止,如果发送CTRL+C(SIGINT(,每个进程都会记录信号,它们就会立即终止。
一半的逻辑是有效的:当每个进程都在睡眠时,在SIGINT上,它们将被中断,池将被关闭。
问题:如果所有进程都成功结束并发送了SIGINT,则会记录消息,但不会关闭池。
代码:
import logging
import signal
import os
import time
from multiprocessing import Pool, current_process
logger = logging.getLogger('test')
SIGNAL_NAMES = dict((k, v) for v, k in reversed(sorted(signal.__dict__.items()))
if v.startswith('SIG') and not v.startswith('SIG_'))
class Thing(object):
def __init__(self, my_id):
self.my_id = my_id
self.logger = logging.getLogger(str(my_id))
def foo(self):
time.sleep(10)
self.logger.info('[%s] Foo after 10 secs!', self.my_id)
class Daemon(object):
def __init__(self, no_processes, max_count):
signal.signal(signal.SIGINT, self.stop)
self.done = False
self.count = 0
self.max_count = max_count
self.pool = Pool(no_processes, initializer=self.pool_initializer)
def stop(self, signum, _):
""" Stop function for Daemon """
sig = SIGNAL_NAMES.get(signum) or signum
logger.info('[Daemon] Stopping (received signal %s', sig)
self.done = True
def _generate_ids(self):
""" Generator function of the IDs for the Processes Pool """
while not self.done:
if self.count < self.max_count:
my_id = "ID-{}".format(self.count)
logger.info('[Daemon] Generated ID %s', my_id)
time.sleep(3)
yield my_id
self.count += 1
time.sleep(1)
def run(self):
""" Main daemon run function """
pid = os.getpid()
logger.info('[Daemon] Started running on PID %s', str(pid))
my_ids = self._generate_ids()
for res in self.pool.imap_unordered(run_thing, my_ids):
logger.info("[Daemon] Finished %s", res or '')
logger.info('[Daemon] Closing & waiting processes to terminate')
self.pool.close()
self.pool.join()
def pool_initializer(self):
""" Pool initializer function """
signal.signal(signal.SIGINT, self.worker_signal_handler)
@staticmethod
def worker_signal_handler(signum, _):
""" Signal handler for the Process worker """
sig = SIGNAL_NAMES.get(signum) or signum
cp = current_process()
logger.info("[%s] Received in worker %s signal %s", WORKER_THING_ID or '', str(cp), sig)
global WORKER_EXITING
WORKER_EXITING = True
WORKER_EXITING = False
WORKER_THING_ID = None
def run_thing(arg):
""" Worker function for processes """
if WORKER_EXITING:
return
global WORKER_THING_ID
WORKER_THING_ID = arg
run_exception = None
logger.info('[%s] START Thing foo-ing', arg)
logging.getLogger('Thing-{}'.format(arg)).setLevel(logging.INFO)
try:
thing = Thing(arg)
thing.foo()
except Exception as e:
run_exception = e
finally:
WORKER_THING_ID = None
logger.info('[%s] STOP Thing foo-ing', arg)
if run_exception:
logger.error('[%s] EXCEPTION on Thing foo-ing: %s', arg, run_exception)
return arg
if __name__ == '__main__':
logging.basicConfig()
logger.setLevel(logging.INFO)
daemon = Daemon(4, 3)
daemon.run()
您的问题是函数_generate_ids()
中的逻辑。函数永远不会结束,所以pool.imap_unordered()
永远不会自己结束,只需要被CTRL-C中断。
把它改成这样:
def _generate_ids(self):
""" Generator function of the IDs for the Processes Pool """
for i in range(self.max_count):
time.sleep(3)
my_id = "ID-{}".format(self.count)
logger.info('[Daemon] Generated ID %s', my_id)
if self.done:
break
self.count += 1
yield my_id
这些过程正常结束。