Python asyncio取消未等待的协程



给定一个复杂的设置,用来生成一个半并行运行的查询列表(使用一个信号量来避免同时运行太多的查询,避免DDoS服务器)。

我有一个(本身async)函数,创建一些查询:

async def run_query(self, url):
async with self.semaphore:
return await some_http_lib(url)
async def create_queries(self, base_url):
# ...gathering logic is ofc a bit more complex in the real setting
urls = await some_http_lib(base_url).json()

coros = [self.run_query(url) for url in urls]  # note: not executed just yet
return coros
async def execute_queries(self):
queries = await self.create_queries('/seom/url')
_logger.info(f'prepared {len(queries)} queries')
results = []
done = 0
# note: ofc, in this simple example call these would not actually be asynchronously executed.
# in the real case i'm using asyncio.gather, this just makes for a slightly better
# understandable example.
for query in queries:
# at this point, the request is actually triggered
result = await query
# ...some postprocessing
if not result['success']:
raise QueryException(result['message'])  # ...internal exception

done  += 1
_logger.info(f'{done} of {len(queries)} queries done')
results.append(result)

return results

现在这工作得很好,完全按照我的计划执行,我可以通过终止整个操作来处理其中一个查询中的异常。

async def run():
try:
return await QueryRunner.execute_queries()
except QueryException:
_logger.error('something went horribly wrong')
return None

唯一的问题是程序被终止,但留给我通常的RuntimeWarning: coroutine QueryRunner.run_query was never awaited,因为队列中稍后的查询(正确地)没有执行,因此没有等待。

是否有办法取消这些未等待的协程?否则,是否有可能抑制这一警告?

[Edit]关于如何在这个简单的例子之外执行查询的更多上下文:查询通常被分组在一起,因此有多个调用create_queries()的不同参数。然后循环所有收集到的组,并使用asyncio.gather(group)执行查询。这等待一个组的所有查询,但如果一个组失败,其他组也被取消,这导致错误被抛出。

那么您正在询问如何取消尚未等待或传递给gather的协程。有两个选项:

  • 可以呼叫asyncio.create_task(c).cancel()
  • 你可以直接在协程对象
  • 上调用c.close()

第一个选项有点重量级(它创建任务只是为了立即取消它),但它使用文档中的asyncio功能。第二个选项更轻量级,但也更低级。

以上适用于从未转换为任务的协程对象(例如,通过将它们传递给gatherwait)。如果它们有,例如,如果您调用asyncio.gather(*coros),其中一个引发,并且您想取消其余的,您应该更改代码,首先使用asyncio.create_task()将它们转换为任务,然后调用gather,并使用finally取消未完成的任务:

tasks = list(map(asyncio.create_task, coros))
try:
results = await asyncio.gather(*tasks)
finally:
# if there are unfinished tasks, that is because one of them
# raised - cancel the rest
for t in tasks:
if not t.done():
t.cancel()

使用

pending = asyncio.tasks.all_tasks()  # < 3.7 

pending = asyncio.all_tasks()  # >= 3.7 (not sure)

获取挂起任务列表。你可以用

等他们
await asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)

或取消它们:

for task in pending:
task.cancel()

最新更新