python3.6 新更改是否'async for'与枚举不兼容



我正在尝试将我的应用程序从python2.7移动到python3.6,用于asyncio和相关库,但我发现某些功能无法正常工作。我使用电机从mongodb异步查询,例如:

async def do_query():
    song_already_processed = set()
    song_table = db.song_table
    async for index, item in enumerate(song_table.find({'lang': 'English'},
                              {'id': 1, '_id': 0, 'title': 1, 'artist.name': 1})):
        if index > 100:
            break
        if item['id'] in song_already_processed:
            continue
        song_already_processed.add(item['id'])
    print(len(song_already_processed))

但它引发了错误:

TypeError: 'AsyncIOMotorCursor' object is not iterable

我认为异步迭代器协议与普通的交互器协议不同,因此枚举效果不佳。是否有可以使用的异步枚举?

顺便说一句,我知道很多方法可以只获取 100 个文档并停止迭代,我只想知道如何正确使用"异步">

asyncstdlib库(免责声明:我维护此软件包(提供了标准库帮助程序的async变体。具体来说,asyncstdlib.enumerate的工作方式类似于enumerate,但采用并生成异步可迭代对象。

import asyncstdlib as a
async for index, item in a.enumerate(song_table.find(...)):
    if index > 100:
        break
    ...

请注意,从异步迭代中break通常不是一个好主意 - 迭代器可能不会在迭代结束时进行清理(有关详细信息,请参阅 PEP 533(。

由于您使用index只是为了获取前 100 个项目,因此您还可以使用 asyncstdlib.islice 直接安全地限制迭代:

import asyncstdlib as a
async for item in a.islice(song_table.find(...), 100):
    ...

Python3.6 添加了异步生成器,它允许轻松实现异步枚举:

async def aenumerate(asequence, start=0):
    """Asynchronously enumerate an async iterator from a given start value"""
    n = start
    async for elem in asequence:
        yield n, elem
        n += 1

对于旧版本的 Python,必须手动调整异步生成器:

class AsyncEnumerate:
    """Asynchronously enumerate an async iterator from a given start value"""
    def __init__(self, asequence, start=0):
        self._asequence = asequence
        self._value = start
    async def __anext__(self):
        elem = await self._asequence.__anext__()
        value, self._value = self._value, self._value + 1
        return value, elem
    def __aiter__(self):
        return self

如果你不介意有一个外部依赖,你可以使用 aiostream.stream.enumerate:

async for i, item in aiostream.stream.enumerate(cursor):
    ...

请参阅此演示和文档中的更多示例。

我有一个类似的问题,涉及使用异步生成器作为使用枚举和异步的可迭代对象。我意识到在我的情况下最简单的答案是通过修改异步生成器来实际返回索引以返回索引,而不是异步。见下文:

async def whitespace(data: List) -> tuple:
    """function to remove any whitespace in returned values"""
    for i, v in enumerate(data):
        yield i, x.strip()
async for i, x in whitespace(a_list_of_stuff):
    a_list_of_stuff[i] = x 

最新更新