如何使用 rxpython 使长时间运行的程序超时?



假设我有一个长时间运行的python函数,看起来像这样?

import random
import time
from rx import Observable
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x

我希望能够设置1000ms超时。

所以我在做类似的东西,创建一个可观察的,并通过上面的密集计算来映射它。

a = Observable.repeat(1).map(lambda x: intns(x))

现在对于发出的每个值,如果它花费超过 1000 毫秒,我想在使用on_erroron_completed达到1000ms后立即结束可观察量

a.timeout(1000).subscribe(lambda x: print(x), lambda x: print(x))

上面的语句确实超时,并调用on_error,但它继续完成密集计算,然后才返回到下一个语句。有没有更好的方法

?最后一条语句打印以下内容

8 # no of seconds to sleep
begin # begins sleeping, trying to emit the first value
Timeout # operation times out, and calls on_error
end # thread waits till the function ends

这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果。

我想知道intns函数是否在单独的线程上完成,我想主线程在超时后继续执行,但我仍然想停止计算线程上的intns函数,或者以某种方式杀死它。

下面是一个可以使用with timeout() :调用的类

如果代码下的块运行时间超过指定时间,则会引发TimeoutError

import signal
class timeout:
# Default value is 1 second (1000ms)
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
# example usage
with timeout() :
# infinite while loop so timeout is reached
while True :
pass

如果我理解你的函数,下面是你的实现的样子:

def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
with timeout() :
time.sleep(y)
print('end')
return x

您可以使用线程部分执行此操作 虽然在 python 中没有特定的方法来杀死线程,但你可以实现一种方法来标记线程结束。

如果线程正在等待其他资源,这将不起作用(在您的情况下,您通过随机等待模拟了"长时间"运行的代码(

参见 有没有办法杀死Python中的线程?

这样它的工作原理:

import random
import time
import threading
import os
def intns(x):
y = random.randint(5,10)
print(y)
print('begin')
time.sleep(y)
print('end')
return x

thr = threading.Thread(target=intns, args=([10]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
if(time.clock() - st > 9):
os._exit(0)

下面是超时的示例

import random
import time
import threading
_timeout = 0
def intns(loops=1):
print('begin')
processing = 0
for i in range(loops):
y = random.randint(5,10)
time.sleep(y)
if _timeout == 1:
print('timedout end')
return
print('keep processing')
return
# this will timeout
timeout_seconds = 10
loops = 10
# this will complete
#timeout_seconds = 30.0
#loops = 1
thr = threading.Thread(target=intns, args=([loops]), kwargs={})
thr.start()
st = time.clock();
while(thr.is_alive() == True):
if(time.clock() - st > timeout_seconds):
_timeout = 1
thr.join()
if _timeout == 0:
print ("completed")
else:
print ("timed-out")

您可以使用 time.sleep(( 并为 time.clock(( 创建一个 while 循环

最新更新