python asyncio cancel run_in_executor阻塞主线程



当通过asyncio run_in_executor启动同步函数然后取消它时,我看到了一个不想要的行为。ThreadPoolExecutor(以及ProcessPoolExecutor)上下文管理器将调用执行器。退出时关机,等待所有挂起的工作完成(在其线程上使用join())。

我希望连接是异步管理的,所以其他任务可能不会被阻塞,但是我发现连接是同步调用的。

run_in_executor的实现应该考虑到这一点吗?还是应该处理(比如调用executor。线程关闭)?

这个示例代码启动了2个任务,并取消了在子线程上运行的一个,从日志中可以看到子线程是取消后唯一执行的代码:

from time import sleep
from asyncio import (
get_running_loop, 
sleep as async_sleep,
get_event_loop,
create_task
)
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
​
​
def sync_watchdog():
for _ in range(30):
print(f'{datetime.now()} I am in a thread')
sleep(1)
​
​
async def inthread():
with ThreadPoolExecutor() as executor:
await get_running_loop().run_in_executor(executor, sync_watchdog)
​
async def watchdog():
for _ in range(30):
print(f'{datetime.now()} I am an async watchdog')
await async_sleep(1)
​
​
async def run():
intread_task = create_task(inthread())
watchdog_task = create_task(watchdog())
​
await async_sleep(2)
​
print('canceling task with subthread')
intread_task.cancel()
​
await async_sleep(10)
​
​
get_event_loop().run_until_complete(run())

输出:

2021-10-27 17:08:16.796171 I am in a thread
2021-10-27 17:08:16.796248 I am an async watchdog
2021-10-27 17:08:17.797205 I am an async watchdog
2021-10-27 17:08:17.797310 I am in a thread
canceling task with subthread
2021-10-27 17:08:18.797990 I am an async watchdog
2021-10-27 17:08:18.798430 I am in a thread
2021-10-27 17:08:19.799037 I am in a thread
2021-10-27 17:08:20.800327 I am in a thread
2021-10-27 17:08:21.801521 I am in a thread
2021-10-27 17:08:22.802819 I am in a thread
2021-10-27 17:08:23.804014 I am in a thread
2021-10-27 17:08:24.805434 I am in a thread
2021-10-27 17:08:25.806640 I am in a thread
2021-10-27 17:08:26.807797 I am in a thread
2021-10-27 17:08:27.808880 I am in a thread
2021-10-27 17:08:28.810305 I am in a thread
2021-10-27 17:08:29.811247 I am in a thread
2021-10-27 17:08:30.811901 I am in a thread
2021-10-27 17:08:31.813083 I am in a thread
2021-10-27 17:08:32.814301 I am in a thread
2021-10-27 17:08:33.815499 I am in a thread
2021-10-27 17:08:34.816565 I am in a thread
2021-10-27 17:08:35.817920 I am in a thread
2021-10-27 17:08:36.818478 I am in a thread
2021-10-27 17:08:37.819603 I am in a thread
2021-10-27 17:08:38.820844 I am in a thread
2021-10-27 17:08:39.822042 I am in a thread
2021-10-27 17:08:40.822491 I am in a thread
2021-10-27 17:08:41.823690 I am in a thread
2021-10-27 17:08:42.824871 I am in a thread
2021-10-27 17:08:43.826012 I am in a thread
2021-10-27 17:08:44.826516 I am in a thread
2021-10-27 17:08:45.827698 I am in a thread
2021-10-27 17:08:46.829813 I am an async watchdog

长话短说:.shutdown()只取消挂起的执行器任务,而不是已经运行的。

这是因为threading.Thread没有.cancel().abort()方法。如果你想在正在运行的线程中停止一个循环,创建一个标志,在每次迭代中检查它,如果你需要停止,就升起这个标志。

最新更新