我在调试一个问题时遇到了一些问题,我有一个异步项目,我希望它能正常关闭。
import asyncio
import signal
async def clean_loop(signal, loop):
print("something")
tasks = [t for t in asyncio.all_tasks() if t is not
asyncio.current_task()]
[task.cancel() for task in tasks]
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def main():
loop = asyncio.get_event_loop()
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(clean_loop(s, loop)))
task = loop.create_task(run(some_code))
loop.call_later(60, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
finally:
loop.close()
if __name__ == "__main__":
main()
当我运行代码并从另一个屏幕发送KeyboardInterrupt或TERM信号时,似乎什么都没有发生,看起来也没有调用clean_loop。
编辑:我想我能够进一步隔离这个问题,我在some_code
中运行的代码有一个内循环,其中包含另一个asyncio.gather(*tasks)
,当我注释掉它时,我能够捕捉到信号并运行clean_loop。有人能解释为什么会发生这种冲突吗?
如果您使用的是类Unix系统,
"quot"为信号添加一个处理程序。仅限UNIX。
它应该可以正常工作。唯一的问题是clean_loop
本身就是一个被loop.stop()
破坏的任务,因此你可以看到:
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<clean_loop() running at ...> wait_for=<_GatheringFuture finished result=[CancelledError('')]>>
但这并不重要,因为它只是用于cancel
任务。
我做了一些与实际问题无关的微小更改:
import asyncio
import signal
async def run():
for i in range(4):
print(i)
await asyncio.sleep(2)
async def clean_loop(signal, loop):
print("-----------------------handling signal")
tasks = asyncio.all_tasks() - {asyncio.current_task()}
for task in tasks:
task.cancel()
print("--------------------------reached here")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
def main():
loop = asyncio.new_event_loop()
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(clean_loop(s, loop)))
task = loop.create_task(run())
loop.call_later(60, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
finally:
loop.close()
if __name__ == "__main__":
main()
输出:
0
1
2
^C-----------------------handling signal
--------------------------reached here
Task was destroyed but it is pending!
task: <Task pending name='Task-2' coro=<clean_loop() running at ...> wait_for=<_GatheringFuture finished result=[CancelledError('')]>>
根据您的编辑:
add_signal_handler的文档显示:
回调将由循环调用以及其他排队的该事件循环的回调和可运行的协同程序。
就像其他协程一样,循环应该以协作的方式调用它。换句话说,协程应该将控制权交还给事件循环,以便事件循环可以运行另一个协程。
在您的情况下,具有无限循环的协程会阻止事件循环运行其他协程,尤其是通过add_signal_handler
注册的回调。它不合作!这就是为什么你认为它不起作用。它闲置在队伍中等待奔跑。