异步自动取消子任务



如果异步任务task_parent创建了子任务task_child,但任务_parent由于创建task_child后引发的异常而被取消,那么task_child是否也会被自动取消(如果它没有受到asyncio.shield的保护(?

例如,在下面的代码中:

async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await asyncio.gather(t1)
r3 = await process_result(r1) # process_result throws an exception
r2, = await asyncio.gather(t2)
return await process_results(r2, r3)

如果process_result(r1(抛出异常,t2是否会自动取消(并随后进行垃圾收集(?

如果我不使用asyncio.gollect,而是直接等待任务:

async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await t1
r3 = await process_result(r1) # process_result throws an exception
r2, = await t2
return await process_results(r2, r3)

如果process_result(r1(抛出异常,那么在这种情况下t2是否也会自动取消?

简单地说,你所有问题的答案都是"不;

在asyncio中,没有父任务与子任务这样的概念,任务之间也没有任何层次关系。只有一个";水平"-所有任务都是等价的。

在某种程度上,您可以通过显式取消finally块中的任务来强制依赖,例如

async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
try:
r1 = await t1
r3 = await process_result(r1) # process_result throws an exception
r2 = await t2
return await process_results(r2, r3)
finally:
t1.cancel()
t2.cancel()

但这不会取消t1和t2创建的任何任务。它只创建一个依赖级别。

在最近的几个小项目中,我成功地使用这个类将任务组织成层次结构:

import asyncio
import logging
logger = logging.getLogger(__name__)
class BagContextError(Exception):
pass
class PBag:
def __init__(self):
self.futures = set()
self.exceptions = []
self.done = asyncio.Event()
self.done.set()
self._opened = False
self._closed = False

@property
def is_open(self):
return self._opened and not self._closed

def __await__(self):
yield from self.done.wait().__await__()

async def __aenter__(self):
if self._opened:
raise BagContextError("Already open")
self._opened = True
return self

async def __aexit__(self, exc_type, _exc_value, _traceback):
logger.debug("Bag exit %s %s %s", self.futures, self.exceptions,
exc_type, stack_info=True)
self._closed = True
await self.aclose()
if self.exceptions:
n = 1 if exc_type is None else 0
for ex in self.exceptions[n:]:
try:
raise ex
except Exception:
logging.exception("Suppressed exception")
if exc_type is None:
raise self.exceptions[0]

def until_done(self):
return self.done.wait()

def create_task(self, coro):
if self._closed:
raise BagContextError("Bag closed")
t = asyncio.create_task(coro)
self.add_future(t)
return t

def add_future(self, fut):
if self._closed:
raise BagContextError("Bag closed")
self.futures.add(fut)
fut.add_done_callback(self._future_done)
self.done.clear()

def close(self):
for w in self.futures:
w.cancel()

async def aclose(self):
self.close()
await self.until_done()

def _future_done(self, fut):
try:
self.futures.remove(fut)
except KeyError:
pass
if not self.futures:
self.done.set()
try:
fut.result()
except asyncio.CancelledError:
pass
except Exception as ex:
self.exceptions.append(ex)

它是一个上下文管理器。在其上下文中,任务由PBag.create_task创建,而不是由asyncio.create_ttask创建。该对象跟踪其依赖任务,并在出现异常、上下文退出或调用close((方法时关闭它们。

如果您使用它来构建整个程序,则任务将按层次排列,并且当最外层的任务被取消时,整个结构将(在某种程度上(优雅地展开。如果您只在某些地方使用它,而不在其他地方使用它(即,如果您在某些地方编写asyncio.create_task(,则这种展开可能不会很好地工作。

我对它没有太多的经验,所以当然可能会有未发现的bug。这里有一个小演示程序:

async def main():
async def task1():
print("Task1 started", time.ctime())
await asyncio.sleep(2)
print("Task1 finished", time.ctime())

async def task2():
print("Task2 started", time.ctime())
await asyncio.sleep(3)
raise Exception("Task 2 error")

async def task3(bag):
bag.create_task(task2())
print("Task 3 done")

try:
async with PBag() as bag:
bag.create_task(task1())
bag.create_task(task3(bag))
await bag.until_done()
bag.create_task(task1())
await bag
except asyncio.CancelledError:
traceback.print_exc()
except Exception:
traceback.print_exc()
print("Bag closed", time.ctime())
asyncio.create_task(task1())
print("Program finished", time.ctime())

if __name__ == "__main__":
asyncio.run(main())

设法找到了我自己问题的答案。任务取消可以通过结构化并发来实现,在当前版本的Python(Python 3.10(中不支持这种并发,尽管有人建议在PEP 654之后引入TaskGroups。

幸运的是,有一个AnyIO库,它在异步之上实现了类似三元组的结构化并发。我问题中的例子可以在AnyIO中重写,以具有可取消的任务:

import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack
async def coroutine1(send_stream):
async with send_stream:
await send_stream.send(1)
async def coroutine2(send_stream):
async with send_stream:
await asyncio.sleep(1)
await send_stream.send(2)
async def process_result(receive_stream, send_stream):
async with AsyncExitStack() as stack:
rs = await stack.enter_async_context(receive_stream)
ss = await stack.enter_async_context(send_stream)
res_rs = await rs.receive()
raise Exception
await ss.send(res_rs + 1)
async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
task_status.started()
async with AsyncExitStack() as stack:
rs_2 = await stack.enter_async_context(receive_stream_2)
rs_3 = await stack.enter_async_context(receive_stream_3)
res_rs_2 = await rs_2.receive()
res_rs_3 = await rs_3.receive()
return res_rs_2 + res_rs_3

async def f():
async with create_task_group() as tg:
send_stream_1, receive_stream_1 = create_memory_object_stream(1)
tg.start_soon(coroutine1, send_stream_1)
send_stream_2, receive_stream_2 = create_memory_object_stream(1)
tg.start_soon(coroutine2, send_stream_2)
send_stream_3, receive_stream_3 = create_memory_object_stream(1)
tg.start_soon(process_result, receive_stream_1, send_stream_3)
# process_result will raise an Exception which will cancel all tasks in tg group
result = await process_results(receive_stream_2, receive_stream_3)
print(result)
asyncio.run(f())

最新更新