在python单元测试中,如果使用asyncio.ensure_future(…(调用函数,我如何断言该函数被调用?我有一个简单的测试失败了:
async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
# await asyncio.sleep(0) # this allows the asyncio.ensure_future calls to run
assert increment.call_count == 3 # this fails because the calls to increment() haven't happened yet
我发现,如果在assert语句之前插入一个类似await asyncio.sleep(0)
的调用,那么测试就会成功。我认为这是有效的,因为它降低了测试任务的优先级,并让对增量的调用先行。这是测试中正确的方法吗?
保存这些任务的引用,并等待它为每个任务调用await
或为一行使用asyncio.gather
。
import asyncio
async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0
tasks = [asyncio.ensure_future(increment()) for _ in range(3)]
await asyncio.gather(*tasks)
assert increment.call_count == 3
asyncio.run(test_assert_ensure_future_called())