块大小或超时异步 python



我试图找出一种简单的方法来解绑队列,方法是获取指定chunk_size超时的队列块。

例如,我希望get_chunks函数返回一个chunk_size项目列表,如果获取它们所需的时间少于超时,否则返回长度在 9 到chunk_size之间的列表。

这是到目前为止的代码:

import asyncio
async def populate(queue):
for i in range(0, 100):
await queue.put(i)
async def _get_chunks(queue, chunk_size):
items = []
for i in range(0, chunk_size):
items.append(await queue.get())
await asyncio.sleep(0.2)
return items
async def get_chunks(queue, chunk_size, timeout):
while True:
yield _get_chunks(queue, chunk_size)
async def listen():
queue = asyncio.Queue()
await populate(queue)
print(f'{queue.qsize()} items in queue')
async for chunk in get_chunks(queue, 10, 1):
print(await chunk)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(listen())
if __name__ == '__main__':
main()  

我认为有一种方法可以使用asyncio.wait来做到这一点:

done, not_done = asyncio.wait([_get_chunks(queue, size),
asyncio.sleep(timeout)],
return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()

但是当asyncio.sleep首先返回时,我无法得到结果。

您无法获得结果,因为_get_chunks尚未完成。一个简单的解决方法是在_get_chunks及其调用方之间具有某种共享状态:

async def _get_chunks(queue, chunk_size, out):
for i in range(0, chunk_size):
out.append(await queue.get())
await asyncio.sleep(0.2)

然后您可以使用wait_for实现超时,这将自动取消超时协程:

items = []
try:
asyncio.wait_for(_get_chunk(queue, size, items))
except asyncio.TimeoutError:
pass
# items now contains the elements _get_chunk managed to extract
# from the queue within the alotted time

最新更新