用于芹菜单元测试的内存经纪人



我在django上写了一个REST API,并在发布时将端点排队一个芹菜任务。响应包含我想用来测试创建任务并获得结果的任务ID。所以,我想做类似的事情:

def test_async_job():
    response = self.client.post("/api/jobs/", some_test_data, format="json")
    task_id = response.data['task_id']
    result = my_task.AsyncResult(task_id).get()
    self.assertEquals(result, ...)

我显然不想经营芹菜工人来运行单元测试,我希望以某种方式嘲笑它。我不能使用celery_always_eager,因为这似乎完全绕过了经纪人,从而阻止我使用asyncresult通过其ID来完成任务(如下所述)。

浏览芹菜和KOMBU文档,我发现有一种内存的运输单元测试,可以做我想要的东西。我尝试覆盖BROKER_URL设置以在测试中使用它:

@override_settings(BROKER_URL='memory://')
def test_async_job():

,但行为与AMPQ代理相同:它阻止了等待结果的测试。知道我应该如何配置这个经纪人以使其在测试中工作?

您可以在设置中指定Broker_backend:

if 'test' in sys.argv[1:]:
    BROKER_BACKEND = 'memory'
    CELERY_TASK_ALWAYS_EAGER = True
    CELERY_TASK_EAGER_PROPAGATES = True

或者您可以直接在测试中使用装饰器覆盖设置

import unittest
from django.test.utils import override_settings

class MyTestCase(unittest.TestCase):
    @override_settings(CELERY_TASK_EAGER_PROPAGATES=True,
                       CELERY_TASK_ALWAYS_EAGER=True,
                       BROKER_BACKEND='memory')
    def test_mytask(self):
        ...

您可以使用kombu内存经纪人运行单元测试,但是要这样做,您需要使用与Django服务器相同的芹菜应用对象旋转芹菜工人。<<<<<<<<<<<<<<

要使用内存经纪人,将Broker_url设置为memory://localhost/

然后,要旋转一个小型芹菜工人,您可以执行以下操作:

app = <Django Celery App>
# Set the worker up to run in-place instead of using a pool
app.conf.CELERYD_CONCURRENCY = 1
app.conf.CELERYD_POOL = 'solo'
# Code to start the worker
def run_worker():
    app.worker_main()
# Create a thread and run the worker in it
import threading
t = threading.Thread(target=run_worker)
t.setDaemon(True)
t.start()

您需要确保使用与Django芹菜应用程序实例相同的应用程序。

请注意,启动工人将打印许多内容并修改记录设置。

这是与芹菜4.x一起使用的Django TransactionTestCase的更完整的示例。

import threading
from django.test import TransactionTestCase
from django.db import connections
from myproj.celery import app  # your Celery app

class CeleryTestCase(TransactionTestCase):
    """Test case with Celery support."""
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        app.control.purge()
        cls._worker = app.Worker(app=app, pool='solo', concurrency=1)
        connections.close_all()
        cls._thread = threading.Thread(target=cls._worker.start)
        cls._thread.daemon = True
        cls._thread.start()
    @classmethod
    def tearDownClass(cls):
        cls._worker.stop()
        super().tearDownClass()

请注意,这不会将您的队列名称更改为测试队列,因此,如果您还运行了该应用程序,则也需要执行此操作。

用于在django中的所有芹菜测试中使用内存经纪人,可以使用此pytest夹具。

test.py

import pytest
from django.conf import settings
from django.test import TestCase

class TestStartFeatureDetectionView(TestCase):
    
    @pytest.fixture(autouse=True)
    def set_celery_broker_to_memory(self):
        settings.BROKER_BACKEND = 'memory://'
        settings.CELERY_BROKER_URL = 'memory://'
        settings.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

SQLalchemy的SQLite数据库用于存储结果。

相关内容

  • 没有找到相关文章

最新更新