在Python中,如何测试进行asyncio.ensure_future(..)调用的函数



在python单元测试中,如果使用asyncio.ensure_future(…(调用函数,我如何断言该函数被调用?我有一个简单的测试失败了:

async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
asyncio.ensure_future(increment())
# await asyncio.sleep(0)  # this allows the asyncio.ensure_future calls to run
assert increment.call_count == 3  # this fails because the calls to increment() haven't happened yet

我发现,如果在assert语句之前插入一个类似await asyncio.sleep(0)的调用,那么测试就会成功。我认为这是有效的,因为它降低了测试任务的优先级,并让对增量的调用先行。这是测试中正确的方法吗?

保存这些任务的引用,并等待它为每个任务调用await或为一行使用asyncio.gather

import asyncio

async def test_assert_ensure_future_called():
async def increment():
increment.call_count += 1
increment.call_count = 0

tasks = [asyncio.ensure_future(increment()) for _ in range(3)]
await asyncio.gather(*tasks)
assert increment.call_count == 3

asyncio.run(test_assert_ensure_future_called())

最新更新