异步python-itertools链接多个生成器



为澄清起见,更新了问题:

假设我有两个处理生成器函数:

def gen1(): # just for examples,
yield 1   # yields actually carry 
yield 2   # different computation weight 
yield 3   # in my case
def gen2():
yield 4
yield 5
yield 6

我可以用itertools 链接它们

from itertools import chain
mix = chain(gen1(), gen2())

然后我可以用它创建另一个生成器函数对象,

def mix_yield():
for item in mix:
yield item

或者简单地说,如果我只想next(mix),它就在那里。

我的问题是,如何在异步代码中实现等效功能

因为我需要它:

  • 以yield(逐个(返回,或使用next迭代器
  • 最快解析的yield first(async(

PREV。更新:

经过实验和研究,我发现了aiostream库,它是itertools的异步版本,所以我做了什么:

import asyncio
from aiostream import stream
async def gen1(): 
await asyncio.sleep(0) 
yield 1 
await asyncio.sleep(0) 
yield 2 
await asyncio.sleep(0) 
yield 3 
async def gen2(): 
await asyncio.sleep(0) 
yield 4 
await asyncio.sleep(0) 
yield 5 
await asyncio.sleep(0) 
yield 6 
a_mix = stream.combine.merge(gen1(),gen2())
async def a_mix_yield():
for item in a_mix:
yield item

但我还是不能做next(a_mix)

TypeError: 'merge' object is not an iterator

next(await a_mix)

raise StreamEmpty()

尽管我仍然可以将其列入列表:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

因此,一个目标已经完成,还有一个目标:

  • 以收益率返回(逐个(,或使用next迭代器

  • -最快的解析收益率优先(异步(

Python的next内置函数只是调用对象上底层__next__方法的一种方便方式。__next__的异步等价物是异步迭代器上的__anext__方法。标准库中没有anext全局函数(aiostream库提供了一个(,但可以很容易地编写它:

async def anext(aiterator):
return await aiterator.__anext__()

但节省的费用太少了,在极少数情况下,当需要时,还不如直接调用__anext__。异步迭代器是通过调用__aiter__(类似于由常规迭代提供的__iter__(从异步迭代中获得的。手动驱动的异步迭代如下所示:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

当没有更多的元素可用时,CCD_ 14将引发CCD_。要在异步迭代器上循环,应该使用async for

下面是一个可运行的示例,基于您的代码,使用__anext__async for来耗尽使用aiostream.stream.combine.merge:设置的流

async def main():
a_mix = stream.combine.merge(gen1(), gen2())
async with a_mix.stream() as streamer:
mix_iter = streamer.__aiter__()    
print(await mix_iter.__anext__())
print(await mix_iter.__anext__())
print('remaining:')
async for x in mix_iter:
print(x)
asyncio.get_event_loop().run_until_complete(main())

我发现了这个答案,并查看了aiostream库。以下是我用来合并多个异步生成器的代码。它不使用任何库。

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
pending = gens.copy()
pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
while len(pending_tasks) > 0:
done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
for d in done:
try:
result = d.result()
yield result
dg = pending_tasks[d]
pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
except StopAsyncIteration as sai:
print("Exception in getting result", sai)
finally:
del pending_tasks[d]

希望这能帮助你,如果有任何错误,请告诉我。

最新更新