如何在python中使用RabbitMQ进行简单的单元测试



在我的单元测试中,我只想开始消费,发布一条消息,然后接收一个响应,并断言响应是否是我期望的。然而,我已经尝试了几个小时,但没有找到解决方案。

问题是,我无法在类中定义一个停止消耗的方法。我尝试过定义这样一种方法:

def stop(self):
self.channel.basic_cancel()
def stop(self):
self.channel.stop_consuming()
def stop(self):
self.connection.close()

但似乎什么都不起作用。我读到这是因为一旦执行start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但如果我这样做,那么我将修改原始的on_request,这对我的应用程序没有用处,因为连接将在第一条消息之后关闭。我已经找到了pytestrabbitmq,但文档对我来说并不清楚,因此不知道我是否可以使用这个插件来实现我想要的。

顺便问一下,basic_cancelstop_consumingclose之间有什么区别?

我对您的场景没有一个清晰的概念!。据我所知,你可以用同样的方法创建连接和通道,这样你就可以在需要时发布、消费、断言和停止消费

希望这能有所帮助!

def test_rabbitmq():
from pika import BlockingConnection, ConnectionParameters, PlainCredentials
conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
channel = conn.channel()
# define your consumer
def on_message(channel, method_frame, header_frame, body):
message = body.decode()
# assert your message here
# asset message == 'value'
channel.basic_cancel('test-consumer')  # stops the consumer
# define your publisher
def publish_message(message):
channel.basic_publish(exchange='', routing_key='', body=message')
publish('your message')
tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming-取消所有使用者,发出退出start_consuming循环的信号。

basic_cancel-此方法取消使用者。消费者标签将作为输入。

close-关闭连接/通道

参考

下面是pytest rabbitmq-plugin的一个示例:

import asyncio
import rabbitpy

async def test_queue(fastapi_client, mocker, rabbitmq):
mock_consumer_method = mocker.MagicMock(name="consumer_method")
mocker.patch(
"my.project.consumer_method",
new=mock_consumer_method,
)
connection_details = rabbitmq.args
os.environ[
"RABBITMQ_CONNECTION_STRING"
] = f"amqp://{connection_details['username']}:{connection_details['password']}@{connection_details['host']}:{connection_details['port']}/"  # this is used by my consumer
channel = rabbitmq.channel()
queue = rabbitpy.Queue(
channel, name="QUEUE_NAME", durable=False, message_ttl=5 * 60 * 1000
)
queue.declare()
message = rabbitpy.Message(channel, "some message")
message.publish(exchange="", routing_key=PRODUCE_QUEUE_NAME)
with fastapi_client:  # triggers the consumer startup event
await asyncio.sleep(20)
mock_consumer_method.assert_called_once_with("some message")

我已经有一个消费者作为我的FastAPI服务器的一部分,您的用例可能会有所不同。

最新更新