Python高速公路Websocket服务器每次接收一个连接


Autobahn使用的Python Asyncio或Twisted应该同时处理并发连接。我遵循了一个关于高速公路的好教程,阅读了文档,一切都很好,但服务器只接收到一个连接并处理它的请求,然后接受第二个连接。

我如何确保服务器在不持有其他连接对等体的情况下同时接收多个连接?

我在网上搜索了一整天,但没有成功这是我的代码(我在调试时删掉了很多代码(

from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory
class NMmapperServerProtocol(WebSocketServerProtocol):
cmd = NMmapperWSCommandParser() # I have cut out this due to debugging
def onMessage(self, payload, isBinary):
"""
@payload the message
@isBinary whether it's a binary message
"""
try:
offload_payload = json.loads(payload.decode("utf-8"))
await asyncio.gather(cmd.processWSCommands(offload_payload, self))
except Exception as e:
raise
def onConnect(self, request):
"""
When we've got a peer connect to our server
"""
try:
#print(self)
print(request.peer, "Has connected")
except Exception as e:
raise
def onOpen(self):
"""
We have a fully connection
"""
try:
# Some database action can be made from here
print("Connection now opened")
except Exception as e:
raise
def onClose(self, wasClean, code, reason):
"""
@ the client is closing his or her
connection
"""
try:
print("wasClean ", wasClean)
print("code ", code)
print("reason ", reason)
except Exception as e:
raise
# Setters
def setCsrftoken(self, cookie_string):
"""
@ parse an set
"""
self.csrftoken = self.parse_csrftoken(cookie_string)
# Setters
def setSession(self, cookie_string):
"""
@ parse an set
"""
self.session = self.parse_session(cookie_string)

if __name__=="__main__":
if(IN_PRODUCTION):
print("RUNNING ")
factory = NMmapperWSServerFactory(PRODUCTION_HOST, PRODUCTION_PORT)
factory.run_loop()
else:
print("Running on dev")
factory = WebSocketServerFactory()
factory.protocol = NMmapperServerProtocol

loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()

谢谢。

我终于让它按预期工作了。作为异步库我必须在执行长时间运行任务的每个方法上加上async前缀

问题是onMessage,我必须并行处理消息而不是阻止想要处理这些消息的其他客户端。所以我不得不去

offload_payload = json.loads(payload.decode("utf-8"))
loop = asyncio.get_event_loop()
# Offload command processing
loop.create_task(self.processWSCommands(offload_payload, self))

通过这种方式,每个消息都被并行处理即使在这种情况下,也要确保处理消息的方法或函数不会阻塞。

from autobahn.asyncio.websocket import WebSocketServerProtocol
from autobahn.asyncio.websocket import WebSocketServerFactory
class NMmapperServerProtocol(WebSocketServerProtocol):
cmd = NMmapperWSCommandParser() # I have cut out this due to debugging
async def onMessage(self, payload, isBinary):
"""
@payload the message
@isBinary whether it's a binary message
"""
try:
offload_payload = json.loads(payload.decode("utf-8"))
loop = asyncio.get_event_loop()
#loop.create_task(runner(10, self.peer))
#asyncio.gather(runner(20, self.peer))
# Offload command processing
loop.create_task(self.processWSCommands(offload_payload, self))
except Exception as e:
raise
def onConnect(self, request):
"""
When we've got a peer connect to our server
"""
try:
#print(self)
print(request.peer, "Has connected")
except Exception as e:
raise
def onOpen(self):
"""
We have a fully connection
"""
try:
# Some database action can be made from here
print("Connection now opened")
except Exception as e:
raise
def onClose(self, wasClean, code, reason):
"""
@ the client is closing his or her
connection
"""
try:
print("wasClean ", wasClean)
print("code ", code)
print("reason ", reason)
except Exception as e:
raise
# Setters
def setCsrftoken(self, cookie_string):
"""
@ parse an set
"""
self.csrftoken = self.parse_csrftoken(cookie_string)
# Setters
def setSession(self, cookie_string):
"""
@ parse an set
"""
self.session = self.parse_session(cookie_string)

if __name__=="__main__":
if(IN_PRODUCTION):
print("RUNNING ")
factory = NMmapperWSServerFactory(PRODUCTION_HOST, PRODUCTION_PORT)
factory.run_loop()
else:
print("Running on dev")
factory = WebSocketServerFactory()
factory.protocol = NMmapperServerProtocol
loop = asyncio.get_event_loop()
coro = loop.create_server(factory, '0.0.0.0', 9000)
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()

最新更新