我创建了一个依赖于其他模块的芹菜任务。我在模型保存方法中延迟了芹菜任务,我想通过所有没有芹菜的测试用例。下面是我的代码。
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
super(Offers, self).save(force_insert=False, force_update=False, using=None,
update_fields=None)
change_offer_status.apply_async(args=[self.id], eta=self.valid_to, queue='credr_core_task',
routing_key='credr_core_task')
Test.py
class OfferTests(APITestCase):
authenticated_client = APIClient()
def setUp(self):
data = {
"username": "vivek",
"first_name": "Vivek",
"last_name": "Dogra",
"email": "vivk@credr.com",
"contact_number": "9834982602",
"password": "easy"
}
self.user = User.objects.create(data)
token = Token.objects.get(user__username='vivek')
self.authenticated_client.credentials(HTTP_AUTHORIZATION='Token ' + token.key)
mock_task = Mock()
mock_task.get = Mock(return_value={'success': True})
print mock_task.get() # outputs {'success': True}
with patch('offers.tasks.change_offer_status.apply_async', new=mock_task) as mocked_task:
mocked_task.return_value = True
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True, )
def test_add_offer(self):
"""
add user address
"""
start_date = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
end_date = datetime.strftime(datetime.now() + timedelta(days=3), "%Y-%m-%d %H:%M:%S")
data = {
"type": OfferTypeEnum.FLAT._value_,
"name": "CredR Offer",
"code": "FLAT100",
"status": OfferStatusEnum.ACTIVE._value_,
"value": "1004",
"discount_value": 1004,
"valid_from": start_date,
"valid_to": end_date,
"message": "GET FLAT 100"
}
response = self.authenticated_client.post(URL, data, format='json')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
我在 setUp 方法中创建了一个模拟对象。但是我收到兔子Mq的连接被拒绝错误。
我想通过所有没有芹菜的测试用例
在运行测试时设置CELERY_ALWAYS_EAGER = True
。
[...]任务将在本地执行,而不是发送到队列。
阅读此处的其他配置选项也可能是值得的:http://docs.celeryproject.org/en/latest/configuration.html#celery-always-eager
从你的话"...我想通过所有没有芹菜的测试用例。但是我收到rabbitMq的连接被拒绝错误",我猜您正在尝试在不触发真正的芹菜任务的情况下运行测试。
如果这是真的,你不想嘲笑apply_async
.真正的要求是防止芹菜向队列发送消息以触发任务。所以你可能想嘲笑你的芹菜应用程序send_task
。
表示您在offer
模块中有celeryapp
实例。您可以尝试使用 patch('offers.celeryapp.send_task', new=mock_task)
要替换
patch('offers.tasks.change_offer_status.apply_async', new=mock_task)
.
然后,您的芹菜应用程序不应发送消息。
如果您使用的是 Celery 4+(我忘记了详细版本),则需要将CELERY_ALWAYS_EAGER
更改为 CELERY_TASK_ALWAYS_EAGER