关闭KeyboardInterrupt上的异步循环-运行停止例程



我正在使用python创建一个脚本,该脚本可以同时运行并与一些进程交互。为此,我使用asyncio来实现这种并行性。主要问题是当KeyboardInterrupt或SIGINT发生时,如何运行另一个清理例程。

以下是我为显示问题而编写的示例代码:

import asyncio
import logging
import signal
from time import sleep

class Process:
async def start(self, arguments):
self._process = await asyncio.create_subprocess_exec("/bin/bash", *arguments)
return await self._process.wait()
async def stop(self):
self._process.terminate()

class BackgroundTask:
async def start(self):
# Very important process which needs to run while process 2 is running
self._process1 = Process()
self._process1_task = asyncio.create_task(self._process1.start(["-c", "sleep 100"]))
self._process2 = Process()
self._process2_task = asyncio.create_task(self._process2.start(["-c", "sleep 50"]))
await asyncio.wait([self._process1_task, self._process2_task], return_when=asyncio.ALL_COMPLETED)
async def stop(self):
# Stop process
await self._process1.stop()
# Call a cleanup process which cleans up process 1
cleanup_process = Process()
await cleanup_process.start(["-c", "sleep 10"])
# After that we can stop our second process
await self._process2.stop()

backgroundTask = BackgroundTask()

async def main():
await asyncio.create_task(backgroundTask.start())

logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)

这段代码创建了一个后台任务,启动两个进程(在本例中是两个bash-sleep命令)并等待它们完成。这很好,并且两个命令都是并行运行的。

主要问题是停止程序。当程序接收到SIGINT或KeyboardInterrupt时,我想运行stop方法,它首先停止process1,然后启动清理方法,然后停止process2。这是必要的,因为cleanup命令依赖于process2。

我尝试过的(而不是asyncio.run()和async-main):

def main():
try:
asyncio.get_event_loop().run_until_complete(backgroundTask.start())
except KeyboardInterrupt:
asyncio.get_event_loop().run_until_complete(backgroundTask.stop())
main()

当然,这并没有按预期工作,因为一旦出现KeyboardInterrupt异常,backgroundTask.start任务就会被取消,backgroundTask.stop也会在主循环中启动,所以我的进程会被取消,无法正常停止。

那么,有没有一种方法可以在不取消当前主循环的情况下检测KeyboardInterrupt,并运行我的backgroundTask.stop方法?

您想要添加一个信号处理程序,如文档中的示例所示:

import asyncio
import functools
import os
import signal
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def main():
loop = asyncio.get_running_loop()
for signame in {'SIGINT', 'SIGTERM'}:
loop.add_signal_handler(
getattr(signal, signame),
functools.partial(ask_exit, signame, loop))
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
asyncio.run(main())

不过,这是一个有点过于复杂/过时的例子,请考虑它更像这样(您的协同程序代码位于asyncio.sleep调用所在的位置):

import asyncio
from signal import SIGINT, SIGTERM

async def main():
loop = asyncio.get_running_loop()
for signal_enum in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal_enum, loop.stop)
await asyncio.sleep(3600) # Your code here

asyncio.run(main())

此时,Ctrl+C将中断循环并引发RuntimeError,您可以通过将asyncio.run调用放在try/except块中来捕获它,如下所示:

try:
asyncio.run(main())
except RuntimeError as exc:
expected_msg = "Event loop stopped before Future completed."
if exc.args and exc.args[0] == expected_msg:
print("Bye")
else:
raise

不过,这不是很令人满意(如果其他事情导致了相同的错误怎么办?),所以我更愿意提出一个明显的错误。此外,如果您在命令行上退出,正确的做法是返回正确的退出代码(事实上,示例中的代码只是使用名称,但它实际上是一个IntEnum,其中包含数字退出代码!)

import asyncio
from functools import partial
from signal import SIGINT, SIGTERM
from sys import stderr
class SignalHaltError(SystemExit):
def __init__(self, signal_enum):
self.signal_enum = signal_enum
print(repr(self), file=stderr)
super().__init__(self.exit_code)
@property
def exit_code(self):
return self.signal_enum.value
def __repr__(self):
return f"nExitted due to {self.signal_enum.name}"
def immediate_exit(signal_enum, loop):
loop.stop()
raise SignalHaltError(signal_enum=signal_enum)
async def main():
loop = asyncio.get_running_loop()
for signal_enum in [SIGINT, SIGTERM]:
exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
loop.add_signal_handler(signal_enum, exit_func)
await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
asyncio.run(main())

当Ctrl+C退出时给出:

python cancelling_original.py

Event loop running for 1 hour, press Ctrl+C to interrupt.
^C
Exitted due to SIGINT
echo $?

2

现在有一些代码我很乐意提供!:^)

附言:这里有类型注释:

from __future__ import annotations
import asyncio
from asyncio.events import AbstractEventLoop
from functools import partial
from signal import Signals, SIGINT, SIGTERM
from sys import stderr
from typing import Coroutine
class SignalHaltError(SystemExit):
def __init__(self, signal_enum: Signals):
self.signal_enum = signal_enum
print(repr(self), file=stderr)
super().__init__(self.exit_code)
@property
def exit_code(self) -> int:
return self.signal_enum.value
def __repr__(self) -> str:
return f"nExitted due to {self.signal_enum.name}"
def immediate_exit(signal_enum: Signals, loop: AbstractEventLoop) -> None:
loop.stop()
raise SignalHaltError(signal_enum=signal_enum)
async def main() -> Coroutine:
loop = asyncio.get_running_loop()
for signal_enum in [SIGINT, SIGTERM]:
exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
loop.add_signal_handler(signal_enum, exit_func)
return await asyncio.sleep(3600)
print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
asyncio.run(main())

这里的自定义异常的优点是,您可以专门捕获它,并避免将回溯转储到屏幕

try:
asyncio.run(main())
except SignalHaltError as exc:
# log.debug(exc)
pass
else:
raise

最新更新