我想在使用kafka-python
包时测试脚本。我想测试函数的返回对象类型
def _get_kafka_consumer() -> KafkaConsumer:
consumer = KafkaConsumer(bootstrap_servers=_KAFKA_BOOTSTRAP_SERVICE,
auto_offset_reset='earliest')
consumer.subscribe([_KAFKA_TOPIC_INPUT])
return consumer
我的测试课看起来像
class TestVoiceToText(unittest.TestCase):
def test_get_kafka_consumer_output_type(self):
result = _get_kafka_consumer()
self.assertIsInstance(result, KafkaConsumer)
当然它不会通过,因为没有运行Kafka集群,所以不能创建CCD_ 2。我如何嘲笑KafkaConsumer(...)
的返回是KafkaConsumer
类型的,而不需要调用实际的构造函数?
我设法通过使用unittest.mock
包中的patch
函数来解决问题:
def test_get_kafka_consumer_output_type(self):
with patch('voice_to_text.kafka_connector.KafkaConsumer') as kafka_consumer_class_mock:
kafka_consumer_instance = _get_kafka_consumer()
kafka_consumer_class_mock_instance = kafka_consumer_class_mock.return_value
kafka_consumer_class_mock.assert_called_once()
self.assertEquals(kafka_consumer_class_mock_instance, kafka_consumer_instance)
它只检查_get_kafka_consumer()
的结果是否实际上是通过调用函数KafkaConsumer()
返回的对象。我们不在乎这个函数到底在做什么。
您可能希望在mocking中使用spec
的概念。你可以在这里使用autospec
,我相信你会成功的。Specing将您的mock限制为您正在嘲笑的对象的原始API,并允许它也通过isinstance
测试。以下是有关自动采样的文档:https://docs.python.org/3.3/library/unittest.mock.html#autospeccing
以下是我如何做你的测试:
@mock.patch('voice_to_text.kafka_connector.KafkaConsumer', autospec=True)
def test_get_kafka_consumer_output_type(self, kafka_consumer_mock):
kafka_consumer_instance = _get_kafka_consumer()
kafka_consumer_mock.assert_called_once_with(
bootstrap_servers=_KAFKA_BOOTSTRAP_SERVICE,
auto_offset_reset='earliest')
kafka_consumer_mock.return_value.subscribe.assert_called_once_with(
[_KAFKA_TOPIC_INPUT])
kafka_consumer_mock.assert_called_once()
self.assertEquals(kafka_consumer_mock.return_value, kafka_consumer_instance)
self.assertIsInstance(kafka_consumer_instance, KafkaConsumer)