用Python处理ctypes函数中的无限循环



假设我在C中定义了几个函数,其中一个函数会导致无限循环。我使用Python中的ctypes模块来运行这些函数中的每一个,因此它会导致一个无限循环,导致我的Python脚本完全停止。

我试图在超时的情况下运行C函数,但超时从未被触发。我该如何处理?我无法停止我的Python脚本,如果遇到这个无限循环函数,我的要求是打印一条错误消息,然后继续下一个函数。超时看起来像这样:

limit = 10
def raise_timeout(signum, frame):
raise TimeoutError
def timeout(limit):
signal.signal(signal.SIGALARM, raise_timeout)
signal.alarm(limit)
try:
yield
except TimeoutError:
print('Timed out')
finally:
signal.signal(signal.SIGALARM, signal.SIG_IGN)

## Attempting to run function
for function in function_list:
#Perform actions to load the shared .so library, define res/argtypes etc
with timeout(limit):
result = c_lib() # This is the function I run from ctypes

我看到的唯一方法是使用计时器来处理它,例如,10秒左右。但我觉得我做得不对——我的Python脚本有没有办法发现ctypes函数已经10秒没有响应了,因此它应该以某种方式退出?

我在这里很绝望,任何一种有效但违背常识的恶作剧都是可以的(

感谢您的帮助。

据我所知,如果C代码挂起,Python就无法重新获得控制权。您可以尝试使用多处理池来调用该函数。

演示:

import multiprocessing as mp
from multiprocessing.pool import Pool
import time
def worker(work):
time.sleep(work)  # pretend to do some work
return work
def call(func,work_time,timeout):
global p
a = p.apply_async(func,(work_time,))  # call function using pool
try:
result = a.get(timeout)           # wait for asynchronous result
print(f'{result=}')
except mp.TimeoutError:
p.terminate()                     # kill the pool
p.join()                          # wait for the pool processes to stop
print('Terminated.')
p = Pool(1)                       # restart pool
if __name__ == '__main__':
p = Pool(1)       # global process pool (1 process)
call(worker,1,3)
call(worker,2,3)
call(worker,3,3)
call(worker,4,3)
p.close()         # Tell all processes in pool to stop
p.join()          # Wait for them to stop.

输出:

result=1
result=2
Terminated.
Terminated.

我也遇到过同样的问题,虽然多处理解决方案很好,但实现起来可能有点麻烦。我成功地使用了信号。警报:

timeout = 3
signal.alarm(timeout) # sets the timeout to 3 seconds
ctype_call_that_may_block_forever()
signal.alarm(0) # disables the alarm

相关内容

  • 没有找到相关文章

最新更新