在端点内测试celery.send_task()



我有这样的配置(用于演示)

endpoints.py 
celery_conf.py

在芹菜客户端内部是芹菜的配置设置,在endpoint .py内部有一个端点,例如celery_client被导入。在endpoint .py中导入celery_client(实例化的Celery()对象)

#in endpoints.py
from celery_conf import celery_client
@router.post(
include_in_schema=True,
status_code=status.HTTP_200_OK,
name="some_name:post"
)
def foo_endpoint(
item: PydanticModel, db: Session = Depends(get_database)
) -> dict:
tmp = <some preprocessing of item>
celery_client.send_task(...)
return 200

我想测试这个端点,看看是否已经调用了celery_client.send_task()。我该怎么做呢?我读过pytest补丁功能,但是我不知道如何测试它。

假设我有这样一个测试:

client = TestClient() #fastapi test client
def test_enpoint():
#patch where celery client is imported
with patch('endpoints.celery_client') as mock_task:
client.put(url=app.url_path_for("some_name:post"), data={})
...

如何测试celery_client.send_task()是否已在端点内调用?

你可以这样做:

with patch("endpoints.celery_client.send_task") as mock_task:
client.put(url=app.url_path_for("some_name:post"), data={})
assert mock_task.call_count == 1
assert mock_task.call_args

或者pytest-mock包也可以提供帮助:

def test_endpoint(mocker: MockerFixture):
mock_task = mocker.patch("endpoints.celery_client.send_task")
client.put(url=app.url_path_for("some_name:post"), data={})
mock_task.assert_called_once()
mock_task.assert_called_once_with(arg1, arg2)

相关内容

  • 没有找到相关文章

最新更新