将数据异步传递到 Python 服务器类



我必须将数据从我的测试用例传递到模拟服务器。最好的方法是什么?

这就是我到目前为止所拥有的

mock_server.py

class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
    pass
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
    def __init__(self, request, client_address, server):
        SocketServer.BaseRequestHandler.__init__(self,request,client_address,server)
    def handle(self):
        print server.data #this is where i need the data 
class server_wrap:
     def __init__(self):
        self.server = ThreadedUDPServer( ("127.0.0.1",49555) , ThreadedUDPRequestHandler)
     def set_data(self,data)
        self.server.data = data
     def start(self)
        server_thread = threading.Thread(target=self.server.serve_forever())
     def stop(self)
        self.server.shutdown()

test_mock.py

server_inst = server_wrap()
server_inst.start()
#code which sets the data and expects the handle method to print the data set
server_inst.stop()

我对这段代码的问题是,执行在 server_inst.start() 处停止,服务器进入无限侦听模式

我尝试过但失败的其他解决方案:

  • 使用全局变量
  • 使用队列
  • 起始mock_server.py有自己的

让我知道任何其他可能的解决方案。提前致谢

更新 1:

使用单独的线程将数据发送到套接字:

变化

test_mock.py

def test_set_data(data)
    server_inst = server_wrap()
    server_inst.set_data(data)
    server_inst.start()
if __name__ == "__main__":
    thread = Thread(target=test_set_data, args=("foo_data))
    thread.setDaemon(True)
    thread.start()
    #test code which verifies if data set is same
    #works so far, able to pass data
    #problem starts now
    thread = Thread(target=test_set_data, args=("bar_data))
    thread.setDaemon(True)
    thread.start()
    #says address already in use error
    #Tried calling server.shuddown() in handle , but error persists. Also there is no thread.shop in threading.Thread object

谢谢

服务器进入侦听模式。

在发送所有数据并完成测试之前,不需要server_inst.stop。也许在你的测试拆解中,或者当测试套件完成时。

要将数据发送到服务器,并让句柄选择它,您应该在 anohter 线程上打开一个套接字。然后通过此套接字将数据发送到服务器。

此代码应如下所示:

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    
sock.connect(("127.0.0.1",49555))
sock.send(... the data ...)
received = sock.recv(1024) # the handle can send a response
sock.close()

在你的 django 代码中添加一个函数,它确实在另一个线程上运行。此函数将打开套接字、连接、发送数据并获取响应。您可以从视图、中间件等调用它。

相关内容

  • 没有找到相关文章

最新更新