测试 Azure 服务总线队列的发送和接收消息



我想写一个集成测试,检查Python脚本与Azure服务总线队列的连接。测试应:

  1. 向队列发送消息
  2. 确认消息已进入队列

测试如下:

import pytest
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusSender
CONNECTION_STRING = <some connection string>
QUEUE = <queue name>

def send_message_to_service_bus(sender: ServiceBusSender, msg: str) -> None:
message = ServiceBusMessage(msg)
sender.send_message(message)

class TestConnectionWithQueue:
def test_message_is_sent_to_queue_and_received(self):
msg = "test message sent to queue"
expected_message = ServiceBusMessage(msg)

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE)
with sender:
send_message_to_service_bus(sender, expected_message)

receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE)
with receiver:
messages_in_queue = receiver.receive_messages(max_message_count=10, max_wait_time=20)
assert any(expected_message == str(actual_message) for actual_message in messages_in_queue)

这个测试偶尔会起作用,但通常不会。没有其他消息同时发送到队列。在调试代码时,如果测试不起作用,那么变量messages_in_queue只是一个空列表。

为什么代码不能一直工作?应该做些什么来修复它?

您确定没有其他进程接收您的消息吗?也许你正在与其他同事共享队列连接字符串,构建机器。。。

若要进行故障排除,您需要密切关注Azure门户上的队列监控。调试测试并查看传入消息(如果增量为1(。然后继续调试并检查它是否递减1。

另外,你确定这个单元测试有用吗?看起来你是在测试你的基础设施,而不是测试你的代码

最新更新