我必须将数据从我的测试用例传递到模拟服务器。最好的方法是什么?
这就是我到目前为止所拥有的
mock_server.py
class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
pass
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)
def handle(self):
print server.data #this is where i need the data
class server_wrap:
def __init__(self):
self.server = ThreadedUDPServer( ("127.0.0.1",49555) , ThreadedUDPRequestHandler)
def set_data(self,data)
self.server.data = data
def start(self)
server_thread = threading.Thread(target=self.server.serve_forever())
def stop(self)
self.server.shutdown()
test_mock.py
server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()
我对这段代码的问题是,执行在 server_inst.start() 处停止,服务器进入无限侦听模式
我尝试过但失败的其他解决方案:
- 使用全局变量
- 使用队列
- 起始mock_server.py有自己的主
让我知道任何其他可能的解决方案。提前致谢
更新 1:
使用单独的线程将数据发送到套接字:
变化
test_mock.py
def test_set_data(data)
server_inst = server_wrap()
server_inst.set_data(data)
server_inst.start()
if __name__ == "__main__":
thread = Thread(target=test_set_data, args=("foo_data))
thread.setDaemon(True)
thread.start()
#test code which verifies if data set is same
#works so far, able to pass data
#problem starts now
thread = Thread(target=test_set_data, args=("bar_data))
thread.setDaemon(True)
thread.start()
#says address already in use error
#Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object
谢谢
服务器应进入侦听模式。
在发送所有数据并完成测试之前,不需要server_inst.stop
。也许在你的测试拆解中,或者当测试套件完成时。
要将数据发送到服务器,并让句柄选择它,您应该在 anohter 线程上打开一个套接字。然后通过此套接字将数据发送到服务器。
此代码应如下所示:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1",49555))
sock.send(... the data ...)
received = sock.recv(1024) # the handle can send a response
sock.close()
在你的 django 代码中添加一个函数,它确实在另一个线程上运行。此函数将打开套接字、连接、发送数据并获取响应。您可以从视图、中间件等调用它。