如果我有一个使用异步生成器项的协同程序;最好的";如何从外部条件终止循环?
考虑一下,
while not self.shutdown_event.is_set():
async with self.external_lib_client as client:
async for message in client:
if self.shutdown_event.is_set():
break
await self.handle(message)
如果我设置shutdown_event
,它将突破while循环,但直到async for
循环处理了下一个message
。构造async for
迭代器的正确方法是什么,以便在两个迭代器之间满足产生结果的条件时短路?
是否有添加Timeout
的标准方法?
一种方法是将迭代移动到async def
并使用cancellation:
async def iterate(client):
async for message in client:
# shield() because we want cancelation to cancel retrieval
# of the next message, not ongoing handling of a message
await asyncio.shield(self.handle(message))
async with self.external_lib_client as client:
iter_task = asyncio.create_task(iterate(client))
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
await asyncio.wait([iter_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED)
if iter_task.done():
# iteration has completed, access result to propagate the
# exception if one was raised
iter_task.result()
shutdown_task.cancel()
else:
# shutdown was requested, cancel iteration
iter_task.cancel()
另一种方法是将shutdown_event
转换为一次性异步流,并使用aiostream来监视两者。这样,当关闭事件发出信号时,for
循环会得到一个对象,并且可以在不需要等待下一条消息的情况下脱离循环:
# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())
async with self.external_lib_client as client,
aiostream.stream.merge(done_stream, client).stream() as stream:
async for message in stream:
# the merged stream will provide a bogus value (whatever
# `shutdown_event.wait()` returned) when the event is set,
# so check that before using `message`:
if self.shutdown_event.is_set():
break
await self.handle(message)
注意:由于问题中的代码不可运行,以上示例未经测试。