使用Behave测试asyncio应用程序



是否可以基于asyncio事件测试应用程序?如果没有,用python测试进程内事件总线的测试策略是什么?

我的应用程序是基于用户使用事件总线在系统之间进行通信,而sismic作为状态机。

在我的应用程序中,我使用aiorun启动它,如下所示:

from aiorun import run
async def main():
app = TestApplication()
await shutdown_waits_for(app.cleanup())
run(main())

在这种情况下,TestApplication是一个测试存根,它在状态机和监听事件总线的各个子系统之间进行事件路由。

我想做的是测试系统之间的通信。要运行,GuiApplication必须在事件循环中运行,因此给出的步骤定义(不工作)将是这样的:

@when('I wait for message {message} to be fired containing {key}="{value}"')
@then('message {message} is fired containing {key}="{value}"')
@async_run_until_complete(timeout=1.2)
async def step_impl5(context, message, key=None, value=None):
dict = {}
if key:
dict[key] = value
ta = TestApplication()
await ta.wait_for_message(message, dict)

但是,以上以RuntimeError: This event loop is already running

结束我使用nest_asyncio:

有一点成功
import nest_asyncio
nest_asyncio.apply()

但这似乎也不对。

简而言之,我如何利用现有的事件循环进行测试,或者我可以吗?

您有两个步骤装饰器,@when@then,用于相同的步骤实现。每一步只能有一个。

@when('I wait for message {message} to be fired containing {key}="{value}"')
def step_impl4(context, message, key=None, value=None):
# set up context
pass
@then('message {message} is fired containing {key}="{value}"')
@async_run_until_complete(timeout=1.2)
async def step_impl5(context, message, key=None, value=None):
dict = {}
if context.key:
dict[key] = context.value
ta = TestApplication()
await ta.wait_for_message(message, dict)

最新更新