地址已在python中的异步客户端测试中使用



我有一个关于异步客户端测试的问题。这是我关于测试的代码

class TestSocketClient:
@classmethod
def setup_class(cls):
# enable parallel testing by adding the pytest worker id number to the default port
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "gw0")
worker_number = int(worker_id[2:])
cls.mock_server = ServerMock()
cls.mock_server.port = ServerMock.port + worker_number
cls.username = os.environ.get("USERNAME", "")
cls.password = os.environ.get("PASSWORD", "")
cls.socket_client = SocketClient(username=cls.username, password=cls.password)
cls.socket_client.hostname = "0.0.0.0"
cls.socket_client.port = SocketClient.port + worker_number
@pytest.mark.asyncio
async def test_login(self):
await self.mock_server.start()
response = await self.socket_client.login()
assert response == actual_response
await self.socket_client.close()
@pytest.mark.asyncio
async def test_send_heartbeat(self):
await self.mock_server.start()
await self.socket_client.login()
await self.socket_client.send_heartbeat()
await self.socket_client.close()

我可以在TestSocketClient下单独运行测试,它们将单独通过。但是,当我将测试套件与pytest -n auto一起运行时,后一个测试用例将引发error while attempting to bind on address ('0.0.0.0', 2056): address already in use。我的问题是如何在不解决分配问题的情况下使测试套件通过,以便它们能够在CI过程中成功运行。如果在编写异步测试方面有一些更有价值的建议,我将不胜感激(例如,如果我只想测试客户端想要发送到服务器的请求,我应该断言什么?我应该断言在服务器端收到的消息,还是只在客户端写assert_called_once之类的东西(。提前感谢!

更新:

我终于在不同的测试中解决了端口增量的问题,比如下面的

class TestSocketClient:
ports_taken = set()
@classmethod
def setup_class(cls):
cls.mock_server = ServerMock()
cls.username = os.environ.get("USERNAME", "")
cls.password = os.environ.get("PASSWORD", "")
cls.socket_client = SocketClient(username=cls.username, password=cls.password)
cls.socket_client.hostname = "0.0.0.0"
cls.socket_client.port = cls.mock_server.port

def bump(self):
if len(self.ports_taken) == 0:
self.ports_taken.add(self.mock_server.port)
new_port = max(self.ports_taken) + 1
self.mock_server.port = new_port
self.socket_client.port = new_port
self.ports_taken.add(self.mock_server.port)

async def start(self):
self.bump()
try:
await self.mock_server.start()
except:
self.bump()
await self.mock_server.start()

@pytest.mark.asyncio
async def test_login(self):
await self.start()
...

希望这能有所帮助!

对于address already in use,通过检查当前运行的进程

ps -ax | grep <your app/app port>

在本文中,您将注意到该流程已经存在。请。使用sudo kill -9 <process_id>终止该进程

现在重新启动您的服务。我不确定异步测试。

最新更新