未在服务器上调用高速公路Python-onMessage事件



我对python和WebSocket很陌生,我正在使用Twisted和autobahn为学校项目创建WebSocket服务器。我正在使用这个Python聊天示例中的html/javascript客户端测试这个服务器。

服务器本身通过关键字(sessionID):client的字典来跟踪连接的客户端。当调用OnMessage()时,我读取值,创建一个json,然后使用Kafka发送该json进行其他处理。

服务器在我的本地主机上运行良好,使用我们学校提供的Ubuntu服务器上的Kafka。然而,在我尝试在Ubuntu服务器上部署这个WebSocket服务器之后,我无法使OnMessage()事件发生。

我得到了OnOpen()、OnClose()事件,但无论我对客户端/服务器做什么,都不会调用OnMessage(OnMessageBegin)事件。

我尝试通过iptables和ufw关闭防火墙,但没有成功。由于此服务器的限制,只有8000到9000之间的端口处于打开状态。当使用websocket时,这可能是高速公路/扭曲的问题吗?

以下是我用于WebSocket服务器协议的代码:

class MiddleServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
logger.info("Client connecting: {0}".format(request.peer))
def onOpen(self):
identifier = uuid.uuid4()
self.factory.register(self, identifier)
logger.info("WebSocket connection open, Session ID: " + identifier)
def onMessageBegin(self, isBinary):
super(MiddleServerProtocol, self).onMessageBegin(isBinary)

def onMessage(self, payload, is_binary):
key = str(list(clients.keys())[list(clients.values()).index(self)])
if is_binary:
logger.error("Invalid message recieved: {0} bytes".
format(len(payload)) + " , Session ID: " + key)
else:
# SEND MESSAGE TO KAFKA
message = prepare_message(payload, key).encode("utf-8")
self.factory.producer_kafka.send(configLoad.
get_value('TOPIC_SEND'),
value=json.loads(message),
key=key.encode('utf-8'))
self.factory.producer_kafka.flush()
logger.info("Json message received: " + message.
decode('cp1250') + " ,Session ID: " + key)
def onClose(self, was_clean, code, reason):
key = str(list(clients.keys())[list(clients.values()).index(self)])
logger.info("WebSocket connection closed: {0}".
format(reason) + " , Session ID: " + key)

这是我在Websocket服务器工厂使用的代码:

class MiddleServerFactory(WebSocketServerFactory):
def __init__(self, *args, **kwargs):
super(MiddleServerFactory, self).__init__(*args, **kwargs)
self.producer_kafka = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=configLoad.
get_value('BOOTSTRAPSERVER'))
logger.info(" Server successfully started.")
def register(self, client, identifier):
"""
Add client to list of managed connections.
We use generated ID as a unique identifier.
"""
clients[identifier] = client
def unregister(self, identifier):
"""
Remove client from list of managed connections.
"""
del clients[identifier]

以下是我运行它的方式:

factory = MiddleServerFactory(configLoad.get_value('URL'))
factory.protocol = MiddleServerProtocol
reactor.listenTCP(int(configLoad.get_value('PORT')), factory)
reactor.run()

感谢您提前提供的帮助和建议。

对于任何仍然想知道的人:

这一次的问题是,我没有在服务器和客户端之间使用HTTPS(WSS)通信,因为服务器本身位于代理之后,这会弄乱HTTP标头。为了解决这个问题,我不得不切换到HTTPS(WSS)通信,它显然保持了HTTP标头的加密,代理不会干扰它们。

最新更新