pytest与会话范围的fixture和asyncio有关



我有多个测试文件,每个文件都有一个异步固定装置,如下所示:


@pytest.fixture(scope="module")
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()

@pytest.fixture(scope="module")
async def some_fixture():
return await make_fixture()

我正在使用xdist进行并行化。此外,我还有这个装饰器:

@toolz.curry
def throttle(limit, f):
semaphore = asyncio.Semaphore(limit)
@functools.wraps(f)
async def wrapped(*args, **kwargs):
async with semaphore:
return await f(*args, **kwargs)
return wrapped

我有一个使用它的函数:

@throttle(10)
def f():
...

现在从多个测试文件调用f,我收到一个异常,告诉我不能使用来自不同事件循环的信号量。

我尝试转移到会话级别的事件循环固定装置:


@pytest.fixture(scope="session", autouse=True)
def event_loop(request):
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()

但这只给了我:

Scopeismatch:您试图使用"模块"范围的请求对象访问"函数"范围的fixture"event_loop",涉及工厂

是否有可能让xdist+异步fixture+信号量一起工作?

最终使用以下conftest.py:使其工作

import asyncio
import pytest

@pytest.fixture(scope="session")
def event_loop():
return asyncio.get_event_loop()

最新更新