异步从异步发电机呈现



我希望能够从多个异步旋钮中产生。ASYNCIO的as_completed有点接近我要寻找的东西(即,我希望任何一个Coroutines能够随时回到呼叫者,然后继续(,但这似乎只允许带有单一返回的常规Coroutines。

这是我到目前为止所拥有的:

import asyncio

async def test(id_):
    print(f'{id_} sleeping')
    await asyncio.sleep(id_)
    return id_

async def test_gen(id_):
    count = 0
    while True:
        print(f'{id_} sleeping')
        await asyncio.sleep(id_)
        yield id_
        count += 1
        if count > 5:
            return

async def main():
    runs = [test(i) for i in range(3)]
    for i in asyncio.as_completed(runs):
        i = await i
        print(f'{i} yielded')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

runs = [test_gen(i) for i in range(3)]替换runs = [test(i) for i in range(3)],并以for i in asyncio.as_completed(runs)迭代每个收益率是我追求的。

这是否可以在Python中表达,并且是否有第三方可以为您提供更多选择,而不是Coroutine Process流的标准库?

谢谢

您可以使用aiostream.stream.merge:

from aiostream import stream
async def main():
    runs = [test_gen(i) for i in range(3)]
    async for x in stream.merge(*runs):
        print(f'{x} yielded')

在安全的上下文中运行它,以确保在迭代后正确清理发电机:

async def main():
    runs = [test_gen(i) for i in range(3)]
    merged = stream.merge(*runs)
    async with merged.stream() as streamer:
        async for x in streamer:
            print(f'{x} yielded')

或使用管道使其更紧凑:

from aiostream import stream, pipe
async def main():
    runs = [test_gen(i) for i in range(3)]
    await (stream.merge(*runs) | pipe.print('{} yielded'))

文档中的更多示例。


adressing @nirvana-msu评论

可以通过相应地准备来源来识别产生给定值的发电机:

async def main():
    runs = [test_gen(i) for i in range(3)]
    sources = [stream.map(xs, lambda x: (i, x)) for i, xs in enumerate(runs)]
    async for i, x in stream.merge(*sources):
        print(f'ID {i}: {x}')

相关内容

  • 没有找到相关文章

最新更新