是否有相当于线程世界 yield() 或 sleep(0) 函数的 python 异步?



我正在编写一些单元测试(使用asynctest(。想象一个最小的函数:

async def foo(self):
await self._bar_()

。与关联的单元测试:

import asynctest as unittest
from unittest.mock import Mock
async def test_func(self):
obj._bar_ = Mock()
await obj.foo()
obj._bar_.assert_awaited() # incorrectly asserts that bar hasn't been awaited

为了解决注释中提出的一个问题:asynctest 是我用于 unittest asyncio 代码的库,它支持此代码中提到的所有 unittest 功能的直接替换。具体如文档中所述:

增强 unittest.mock.Mock,使其返回 CoroutineMock 对象,而不是 Mock 对象,其中规范或spec_set对象上的方法是一个协程。


这在表面上不起作用,因为事件循环仅在当前test_func()任务生成后才会启动等待的_bar_()方法。

Asynctest 提供了一种使用Mock().awaited.wait()进行同步的机制。

async def test_func(self):
obj._bar_ = Mock()
await obj.foo()
await obj._bar_.awaited.wait()
obj._bar_.assert_awaited() # correctly passes assertion test

上述方法有效,但是如果我们想用assert_not_awaited进行测试,这是行不通的。假设我有另一个功能:

async def baz(self):
# do some stuff but never await on _bar_

async def test_func(self):
obj._bar_ = Mock()
await obj.baz()
await obj._bar_.awaited.wait() # hangs forever
obj._baz_.assert_not_awaited()

我发现可靠地测量此测试的唯一方法是执行以下操作:

async def test_func(self):
obj._bar_ = Mock()
await obj.baz()
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(obj._bar_.awaited.wait(), timeout=0.01) # see note below on timeout
obj._bar_.assert_not_awaited() # correctly asserts that `_bar_` was awaited

重要提示:这实际上意味着我总是必须使用这种方法,否则我会自欺欺人地认为我的断言成立。

注意:我已经尝试过这种情况,超时为 0,并且行为恢复到第一种情况(即wait_for只是立即返回而不重新协调任务调度(。

我对这里发生的事情的理解是,超时wait_for本质上就像线程世界中的yield()sleep(0)方法:即它将当前任务放在事件循环队列的末尾,并允许刷新所有其他当前挂起的任务。

所以我的问题是,是否有一种asyncio原生的方法来实现这个目标,即产生当前任务以允许通风所有其他当前未决的任务?

也许你正在寻找

await asyncio.sleep(0)

如文档中所述:

跳过一个事件循环运行周期。

这是"asyncio.sleep(("的私人助手,使用 当"延迟"设置为 0 时。 它使用裸露的"产量" 表达式(Task.__step知道如何处理( 而不是创建 Future 对象。

最新更新