给定一个复杂的设置,用来生成一个半并行运行的查询列表(使用一个信号量来避免同时运行太多的查询,避免DDoS服务器)。
我有一个(本身async)函数,创建一些查询:
async def run_query(self, url):
async with self.semaphore:
return await some_http_lib(url)
async def create_queries(self, base_url):
# ...gathering logic is ofc a bit more complex in the real setting
urls = await some_http_lib(base_url).json()
coros = [self.run_query(url) for url in urls] # note: not executed just yet
return coros
async def execute_queries(self):
queries = await self.create_queries('/seom/url')
_logger.info(f'prepared {len(queries)} queries')
results = []
done = 0
# note: ofc, in this simple example call these would not actually be asynchronously executed.
# in the real case i'm using asyncio.gather, this just makes for a slightly better
# understandable example.
for query in queries:
# at this point, the request is actually triggered
result = await query
# ...some postprocessing
if not result['success']:
raise QueryException(result['message']) # ...internal exception
done += 1
_logger.info(f'{done} of {len(queries)} queries done')
results.append(result)
return results
现在这工作得很好,完全按照我的计划执行,我可以通过终止整个操作来处理其中一个查询中的异常。
async def run():
try:
return await QueryRunner.execute_queries()
except QueryException:
_logger.error('something went horribly wrong')
return None
唯一的问题是程序被终止,但留给我通常的RuntimeWarning: coroutine QueryRunner.run_query was never awaited
,因为队列中稍后的查询(正确地)没有执行,因此没有等待。
是否有办法取消这些未等待的协程?否则,是否有可能抑制这一警告?
[Edit]关于如何在这个简单的例子之外执行查询的更多上下文:查询通常被分组在一起,因此有多个调用create_queries()的不同参数。然后循环所有收集到的组,并使用asyncio.gather(group)执行查询。这等待一个组的所有查询,但如果一个组失败,其他组也被取消,这导致错误被抛出。
那么您正在询问如何取消尚未等待或传递给gather
的协程。有两个选项:
- 可以呼叫
asyncio.create_task(c).cancel()
- 你可以直接在协程对象 上调用
c.close()
第一个选项有点重量级(它创建任务只是为了立即取消它),但它使用文档中的asyncio功能。第二个选项更轻量级,但也更低级。
以上适用于从未转换为任务的协程对象(例如,通过将它们传递给gather
或wait
)。如果它们有,例如,如果您调用asyncio.gather(*coros)
,其中一个引发,并且您想取消其余的,您应该更改代码,首先使用asyncio.create_task()
将它们转换为任务,然后调用gather
,并使用finally
取消未完成的任务:
tasks = list(map(asyncio.create_task, coros))
try:
results = await asyncio.gather(*tasks)
finally:
# if there are unfinished tasks, that is because one of them
# raised - cancel the rest
for t in tasks:
if not t.done():
t.cancel()
使用
pending = asyncio.tasks.all_tasks() # < 3.7
或
pending = asyncio.all_tasks() # >= 3.7 (not sure)
获取挂起任务列表。你可以用
等他们await asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)
或取消它们:
for task in pending:
task.cancel()