将async for与if条件相结合以在等待中中断的正确方法是什么



如果我有一个使用异步生成器项的协同程序;最好的";如何从外部条件终止循环?

考虑一下,

while not self.shutdown_event.is_set():
async with self.external_lib_client as client:
async for message in client:
if self.shutdown_event.is_set():
break
await self.handle(message)

如果我设置shutdown_event,它将突破while循环,但直到async for循环处理了下一个message。构造async for迭代器的正确方法是什么,以便在两个迭代器之间满足产生结果的条件时短路?

是否有添加Timeout的标准方法?

一种方法是将迭代移动到async def并使用cancellation:

async def iterate(client):
async for message in client:
# shield() because we want cancelation to cancel retrieval
# of the next message, not ongoing handling of a message
await asyncio.shield(self.handle(message))
async with self.external_lib_client as client:
iter_task = asyncio.create_task(iterate(client))
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
await asyncio.wait([iter_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED)
if iter_task.done():
# iteration has completed, access result to propagate the
# exception if one was raised
iter_task.result()
shutdown_task.cancel()
else:
# shutdown was requested, cancel iteration
iter_task.cancel()

另一种方法是将shutdown_event转换为一次性异步流,并使用aiostream来监视两者。这样,当关闭事件发出信号时,for循环会得到一个对象,并且可以在不需要等待下一条消息的情况下脱离循环:

# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())
async with self.external_lib_client as client, 
aiostream.stream.merge(done_stream, client).stream() as stream:
async for message in stream:
# the merged stream will provide a bogus value (whatever
# `shutdown_event.wait()` returned) when the event is set,
# so check that before using `message`:
if self.shutdown_event.is_set():
break
await self.handle(message)

注意:由于问题中的代码不可运行,以上示例未经测试。

最新更新