PYTEST固定装置和线程同步



我正在尝试使用pytest-xdist,以使我的测试并行运行,问题在于,每个线程都将转到共享所有测试并根据线程编号执行的固定装置。

它引起了我的问题,因为该固定装置的角色是为我的测试创建数据,并且一旦已经创建了它,我就会获得和错误,因为它已经创建了(通过REST)。

conftest.py:

lock = threading.Lock()
@pytest.fixture(scope=session)
def init_data(request):
    lock.acquire()
    try:
        data = []
        if checking_data_not_created():
            data.append(some_rest_command_creating_data_1())
            data.append(some_rest_command_creating_data_2())
    finally:
        lock.release()
    yield data
    lock.acquire()
    try:
        remove_all_data()
    finally:
        lock.release()

tests_class.py:

class classTests():
    def first_test(init_data):
        test body and validation....
     def second_test(init_data):
        test body and validation....

我正在使用命令:pytest -v -n2

假设第一线程应运行first_test()第二个线程应运行second_test()其中一个将始终失败,因为第一个已经在固定装置部分中创建了数据,另一个线程将获得异常,他应该运行的所有测试都将失败。

如您所见,我尝试使用锁来同步线程,但也行不通。

有什么想法我该如何克服这个问题?

谢谢。

这种方法将无法与pytest-xdist一起使用,因为这使用了多处理而不是多线程,但是可以使用-tests-per-per-per-worker选项将其与pytest-paraleral一起使用,它将运行测试使用多个线程。

数据仅设置一次,并在多线程pytest执行中清理一次,并使用以下固定装置进行:

conftest.py:

lock = threading.Lock()
threaded_count = 0
@pytest.fixture(scope='session')
def init_data():
    global lock
    global threaded_count
    lock.acquire()
    threaded_count += 1
    try:
        if threaded_count == 1:
            # Setup Data Once
            data = setup_data()
    finally:
        lock.release()
    yield data
    lock.acquire()
    threaded_count -= 1
    try:
        if threaded_count == 0:
            # Cleanup Data Once
            data = cleaup_data()
    finally:
        lock.release()

命令:

pytest -v-tests-per-worker 2

请注意,pytest-xdist不符合 'session'示例固定装置。pytest-XDIST中的'session'范围内的固定装置实际上意味着特定于过程的会话级灯具,因此它会为每个过程创建并分别拆除,并且在过程之间未共享固定装置的状态。有一些长期的建议可以在Pytest-XDIST上增加锁定和共享真正的会话式固定装置,但它们都遇到了许多经典的理由,不惜一切代价避免使用锁和锁和多处理。同步,结果它被pytest-xdist devs大大剥夺(可以理解的是)。

python xdist有一种方法。

我相信在您的示例中,应用他们的建议看起来像这样:

def create_data():
    data = []
    data.append(some_rest_command_creating_data_1())
    data.append(some_rest_command_creating_data_2())
    return data

@pytest.fixture(scope="session")
def session_data(request, tmp_path_factory, worker_id):
    if worker_id == "master":
        # Not running multiple processes, just create the data.
        data = create_data()
    else:
        # Running multiple processes, manage lockfile.
        root_tmp_dir = tmp_path_factory.getbasetemp().parent
        fn = root_tmp_dir / "data.json"
        with FileLock(str(fn) + ".lock"):
            if fn.is_file():
                # Data has been created, read it in.
                data = json.loads(fn.read_text())
            else:
                # Data not created, create and write it out.
                data = create_data()
                fn.write_text(json.dumps(data))
    yield data
    remove_all_data()

与您的示例不同,此示例是通过在 tmp_path_factory.getbasetemp().parent上保存的锁定文件提供的 data,该目录配有pytest。

最新更新