使用xdist在pytest下运行芹菜测试时出现死锁



如果我在没有涉及xdist的情况下运行,如下所示:

pytest --disable-warnings --verbose -s test_celery_chords.py

效果很好。我看到DB创建了,任务运行了,它按预期退出。

如果我运行时涉及到xdist(-n 2(,如下所示:

pytest --disable-warnings --verbose -n 2 -s test_celery_chords.py

我最终得到了一个挂起的进程(有时还有这些消息(:

Destroying old test database for alias 'default'...
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
[gw0] ERROR test_celery_chords.py::test_chords Destroying test database for alias 'default'...

结束它的唯一方法是^C

这是我的两个测试(本质上是同一个测试(。这些任务(简单的添加和平均示例测试(不需要DB,但其他使用DB的Django测试需要DB。

def test_chords(transactional_db, celery_app, celery_worker, celery_not_eager):
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
task = do_average.delay()
results = task.get()
assert task.state == "SUCCESS"
assert len(results[0][1][1]) == 10

def test_chord_differently(transactional_db, celery_app, celery_worker, celery_not_eager):
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
task = do_average.delay()
results = task.get()
assert task.state == "SUCCESS"
assert len(results[0][1][1]) == 10

和任务(应该无关紧要(

@shared_task
def _add(x: int, y: int) -> int:
print(f"{x} + {y} {time.time()}")
return x + y

@shared_task
def _average(numbers: List[int]) -> float:
print(f"AVERAGING {sum(numbers)} / {len(numbers)}")
return sum(numbers) / len(numbers)

@shared_task
def do_average():
tasks = [_add.s(i, i) for i in range(10)]
print(f"Creating chord of {len(tasks)} tasks at {time.time()}")
return chord(tasks)(_average.s())

使用以下内容的conftest.py:

@pytest.fixture
def celery_not_eager(settings):
settings.CELERY_TASK_ALWAYS_EAGER = False
settings.CELERY_TASK_EAGER_PROPAGATES = False

pytest夹具

celery_app -- .../python3.10/site packages/celery/contrib/pytest.py:173
Fixture creating a Celery application instance.
celery_worker -- .../python3.10/site-packages/celery/contrib/pytest.py:195
Fixture: Start worker in a thread, stop it when the test returns.

使用

django=4.1.2
pytest-celery==0.0.0
pytest-cov==3.0.0
pytest-django==4.5.2
pytest-xdist==2.5.0

虽然我还没有解决这个问题,但我已经找到了一种使用@pytest.mark.xdist_group(name="celery")来装饰测试类的变通方法,我可以执行以下操作:

@pytest.mark.xdist_group(name="celery")
@override_settings(CELERY_TASK_ALWAYS_EAGER=False)
@override_settings(CELERY_TASK_EAGER_PROPAGATES=False)
class SyncTaskTestCase2(TransactionTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.celery_worker = start_worker(app, perform_ping_check=False)
cls.celery_worker.__enter__()
print(f"Celery Worker started {time.time()}")
@classmethod
def tearDownClass(cls):
print(f"Tearing down Superclass {time.time()}")
super().tearDownClass()
print(f"Tore down Superclass {time.time()}")
cls.celery_worker.__exit__(None, None, None)
print(f"Celery Worker torn down {time.time()}")
def test_success(self):
print(f"Starting test at {time.time()}")
self.task = do_average_in_chord.delay()
self.task.get()
print(f"Finished Averaging at {time.time()}")
assert self.task.successful()

这与命令行选项--dist loadgroup相结合;芹菜";组在同一个runner进程上运行,从而防止死锁并允许-numprocesses 10运行到完成。

这里最大的缺点是9秒的惩罚,让芹菜工人崩溃,这会让你倾向于把所有的芹菜测试都推到一个类中。

# This accomplishes the same things as the unitest above WITHOUT having a Class wrapped around the tests it also eliminates the 9 second teardown wait.
@pytest.mark.xdist_group(name="celery")
@pytest.mark.django_db # Why do I need this and transactional_db???
def test_averaging_in_a_chord(
transactional_db,
celery_session_app,
celery_session_worker,
use_actual_celery_worker,
):
task = do_average_in_chord.delay()
task.get()
assert task.successful()

你确实需要这个在你的conftest.py

from typing import Type
import time
import pytest
from pytest_django.fixtures import SettingsWrapper
from celery import Celery
from celery.contrib.testing.worker import start_worker

@pytest.fixture(scope="function")
def use_actual_celery_worker(settings: SettingsWrapper) -> SettingsWrapper:
"""Turns of CELERY_TASK_ALWAYS_EAGER and CELERY_TASK_EAGER_PROPAGATES for a single test. """
settings.CELERY_TASK_ALWAYS_EAGER = False
settings.CELERY_TASK_EAGER_PROPAGATES = False
return settings

@pytest.fixture(scope="session")
def celery_session_worker(celery_session_app: Celery):
"""Re-implemented this so that my celery app gets used.  This keeps the priority queue stuff the same
as it is in production.  If BROKER_BACKEND is set to "memory" then rabbit shouldn't be involved anyway."""
celery_worker = start_worker(
celery_session_app, perform_ping_check=False, shutdown_timeout=0.5
)
celery_worker.__enter__()
yield celery_worker
# This causes the worker to exit immediately so that we don't have a 9 second wait for the timeout.
celery_session_app.control.shutdown()
print(f"Tearing down Celery Worker {time.time()}")
celery_worker.__exit__(None, None, None)
print(f"Celery Worker torn down {time.time()}")

@pytest.fixture(scope="session")
def celery_session_app() -> Celery:
from workshop.celery import app
""" Get the app you would regularly use for celery tasks and return it.  This insures all of your default
app options mirror what you use at runtime."""
yield app

相关内容

  • 没有找到相关文章

最新更新