是否可以基于asyncio
事件测试应用程序?如果没有,用python测试进程内事件总线的测试策略是什么?
我的应用程序是基于用户使用事件总线在系统之间进行通信,而sismic作为状态机。
在我的应用程序中,我使用aiorun
启动它,如下所示:
from aiorun import run
async def main():
app = TestApplication()
await shutdown_waits_for(app.cleanup())
run(main())
在这种情况下,TestApplication
是一个测试存根,它在状态机和监听事件总线的各个子系统之间进行事件路由。
我想做的是测试系统之间的通信。要运行,GuiApplication
必须在事件循环中运行,因此给出的步骤定义(不工作)将是这样的:
@when('I wait for message {message} to be fired containing {key}="{value}"')
@then('message {message} is fired containing {key}="{value}"')
@async_run_until_complete(timeout=1.2)
async def step_impl5(context, message, key=None, value=None):
dict = {}
if key:
dict[key] = value
ta = TestApplication()
await ta.wait_for_message(message, dict)
但是,以上以RuntimeError: This event loop is already running
结束我使用nest_asyncio:
有一点成功import nest_asyncio
nest_asyncio.apply()
但这似乎也不对。
简而言之,我如何利用现有的事件循环进行测试,或者我可以吗?
您有两个步骤装饰器,@when
和@then
,用于相同的步骤实现。每一步只能有一个。
@when('I wait for message {message} to be fired containing {key}="{value}"')
def step_impl4(context, message, key=None, value=None):
# set up context
pass
@then('message {message} is fired containing {key}="{value}"')
@async_run_until_complete(timeout=1.2)
async def step_impl5(context, message, key=None, value=None):
dict = {}
if context.key:
dict[key] = context.value
ta = TestApplication()
await ta.wait_for_message(message, dict)