我试图找出一种简单的方法来解绑队列,方法是获取指定chunk_size和超时的队列块。
例如,我希望get_chunks
函数返回一个chunk_size项目列表,如果获取它们所需的时间少于超时,否则返回长度在 9 到chunk_size之间的列表。
这是到目前为止的代码:
import asyncio
async def populate(queue):
for i in range(0, 100):
await queue.put(i)
async def _get_chunks(queue, chunk_size):
items = []
for i in range(0, chunk_size):
items.append(await queue.get())
await asyncio.sleep(0.2)
return items
async def get_chunks(queue, chunk_size, timeout):
while True:
yield _get_chunks(queue, chunk_size)
async def listen():
queue = asyncio.Queue()
await populate(queue)
print(f'{queue.qsize()} items in queue')
async for chunk in get_chunks(queue, 10, 1):
print(await chunk)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(listen())
if __name__ == '__main__':
main()
我认为有一种方法可以使用asyncio.wait
来做到这一点:
done, not_done = asyncio.wait([_get_chunks(queue, size),
asyncio.sleep(timeout)],
return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()
但是当asyncio.sleep
首先返回时,我无法得到结果。
您无法获得结果,因为_get_chunks
尚未完成。一个简单的解决方法是在_get_chunks
及其调用方之间具有某种共享状态:
async def _get_chunks(queue, chunk_size, out):
for i in range(0, chunk_size):
out.append(await queue.get())
await asyncio.sleep(0.2)
然后您可以使用wait_for
实现超时,这将自动取消超时协程:
items = []
try:
asyncio.wait_for(_get_chunk(queue, size, items))
except asyncio.TimeoutError:
pass
# items now contains the elements _get_chunk managed to extract
# from the queue within the alotted time