Python 警报信号,从外部超时当前线程并处理异常?



众所周知,python中的信号只在主线程内部工作,这是我关于这个主题的小片段:

import signal
from threading import Timer
from time import sleep
class timeout:
def __init__(self, seconds=1, error_message='Timeout error'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
def main():
try:
with timeout(seconds=2) :
#do_something
sleep(3)
print ("don't come here after 3 seconds")
except Exception as e:
print ("catch here",str(e))
print ("continue ...")
t = Timer(0.0, main)
t.start()

现在,为了强制它工作,我用钩子动态函数将signal.signal放在线程外部。

class timeout:
def __init__(self, seconds=1, error_message='Timeout error'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self):
raise TimeoutError(self.error_message)
def __enter__(self):
#fluid.error = self.error_message
#fluid.__call__ = self.handle_timeout
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
class fluid:
error = 'Orpheline exception'
def __init__(self,signum,frame):
self.signum = signum
self.frame = frame
def __call__(self):
try:
raise TimeoutError(self.error)
except Exception as e:
print ("catch now", str(e))
signal.signal(signal.SIGALRM, lambda x,y:fluid(x,y)())
t = Timer(0.0, main)
try:
t.start()
except Exception as e:
print ("catch there",str(e))

使用猴子补丁涉足这个问题会产生以下问题:

  • 如果我取消注释:fluid.error = self.error_message,异常在类中捕获。
  • 如果我取消注释:fluid.__call__ = self.handle_timeout,异常既没有在两个主要处理程序中捕获,程序退出!

唯一对我有用的解决方案是提供一个名为skipvalue的新标志值来检查此并行线程中是否存在异常:

class timeout:
def __init__(self, seconds=1, error_message='Timeout error'):
self.seconds = seconds
self.error_message = error_message
self.skipvalue = False
self.SKIP = lambda : self.skipvalue
def handle_timeout(self):
raise TimeoutError(self.error_message)
def timeitout(self):
#print('not caught ',self.error_message)
self.skipvalue = True
def __enter__(self):
fluid.error = self.error_message
#fluid.__call__ = self.handle_timeout
fluid.__call__ = self.timeitout
signal.alarm(self.seconds)
return self.SKIP
def __exit__(self, type, value, traceback):
signal.alarm(0)
def main():
try:
with timeout(seconds=2,error_message="Some message") as e :
#do_something
sleep(3)
if e():
raise Timeout(fluid.error)
print ("don't come here after 3 seconds")
except Exception as e:
print ("catch here",str(e))
print ("continue ...")
t = Timer(0.0, main)
t.start()

以上使用睡眠功能最多需要 3 秒,使用任意循环,我需要在每个执行周期检查新值。


我的问题

  • 是否有一些更优雅和内置的方式来实现这一目标,即用不必要的变量或类填充代码或分叉子进程?

出现 没有办法用计时器做到这一点,但使用带有系统跟踪的线程似乎半可能

import sys
import trace
import threading
import time
import signal

class thread_with_trace(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, target=kwargs["target"],args=(self,))
self.killed = False
self.ex_handler = kwargs["handler"]
def start(self):
self.__run_backup = self.run
self.run = self.__run
threading.Thread.start(self)
def __run(self):
sys.settrace(self.globaltrace)
self.__run_backup()
self.run = self.__run_backup
def globaltrace(self, frame, event, arg):
if event == 'call':
return self.localtrace
else:
return None
def localtrace(self, frame, event, arg):
if self.killed:
if event == 'line':
raise SystemExit()
return self.localtrace
def kill(self):
self.killed = True
raise self.ex_handler
class fluid:
def __init__(self,signum,frame):
self.signum = signum
self.frame = frame
# do whatever according to signal id
def __call__(self):
pass
signal.signal(signal.SIGALRM, lambda x,y:fluid(x,y)())
class timeout:
def __init__(self, thread=lambda: None, terminatefun=lambda: None, seconds=10):
self.seconds = seconds
self.thisthread = thread
self.terminatefun = terminatefun
def handle_timeout(self):
try:
self.thisthread.kill()
except Exception as e:
print(str(e))
self.terminatefun()
def __enter__(self):
fluid.__call__ = self.handle_timeout
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
def stopit():
print("I should be here after two seconds")
def func(t):
with timeout(thread=t, terminatefun=stopit ,seconds=2):
while True:
time.sleep(0.1)
print("I'm running")

t1 = thread_with_trace(target=func,args=[],handler=TimeoutError("Ran out of time"))
t1.start()

它几乎符合要求,因为SystemExit()在上次超时到期后停止线程(在本例中为 0.1 秒(

最新更新