我有一个方法可能需要很长时间才能完成。如果花了30秒,它应该进入下一行代码。当使用信号时,它看起来像这样:
import signal
def handler(signum, frame):
print('Signal handler called with signal', signum)
signal.signal(signal.SIGALRM, handler)
signal.alarm(30)
result = get_something_that_takes_time()
signal.alarm(0)
call_more_methods(result)
现在我需要以跨平台的方式来做这件事。我尝试使用线程:
import threading
def thread_handler():
raise Exception("Timeout in dateparser search_dates after 30 seconds")
try:
timer = threading.Timer(30, thread_handler)
timer.start()
result = get_something_that_takes_time()
timer.cancel()
except Exception as e:
print(e)
call_more_methods(result)
但它没有起作用。现在,如果可能的话,我正在考虑使用asyncio
。
谢谢你的帮助!
您可以使用asyncio
来完成此操作,前提是您正在等待的方法是一个不可用的对象。相关方法称为asyncio.wait_for.
import asyncio
async def sleep_awhile():
await asyncio.sleep(5)
print("Just woke up...")
async def main():
try:
await asyncio.wait_for(sleep_awhile(), 3)
except asyncio.TimeoutError:
print("Timed out before waking up...")
asyncio.run(main())
# Timed out before waking up...