使用Python Twisted和Autobahn通过WebSocket从Matlab发送JSON数据



我正试图从Matlab创建一个连接,通过WebSocket流式传输JSON帧。我已经测试了我的python高速公路安装,并使用以下内容进行了扭曲。

工作示例

Matlab代码

示例驱动程序代码使用JSONlab工具箱将Matlab数据转换为JSON形式,然后我对数据进行压缩和Base64编码。由于我还没有使用RPC,所以我使用命令行,在命令行中我需要压缩和Base64编码,以避免行长度和shell转义问题。

clear all
close all
python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)
encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);
    i = i + 1;
    toc;
end

广播客户端代码

客户端代码有两种调用方式。您可以通过命令行传递消息,也可以创建BroadcastClient实例并调用sendMessage。

#!/usr/bin/env python
import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy

class BroadcastClient():
    def __init__(self, server=None):
        self.proxy = Proxy(server)
    def errorMessage(self, value):
        print 'Error ', value
    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)

def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()
if __name__ == '__main__':
    main(sys.argv)

广播服务器代码

服务器在7080上提供RPC客户端,在8080上提供web客户端,在9080上使用TXJSONRPC、Twisted和Autobahn提供WebSocket。Autobahn Web客户端对于调试非常有用,应该与服务器代码放在同一目录中。

#!/usr/bin/env python
import sys
from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc
from autobahn.twisted.websocket import WebSocketServerFactory, 
    WebSocketServerProtocol, 
    listenWS

class BroadcastServerProtocol(WebSocketServerProtocol):
    def onOpen(self):
        self.factory.registerClient(self)
    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)
    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)

class BroadcastServerFactory(WebSocketServerFactory):
    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """
    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []
    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)
    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)
    def broadcastMessage(self, message):
        print("broadcasting message '{}' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to {}".format(client.peer))

class BroadcastPreparedServerFactory(BroadcastServerFactory):
    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """
    def broadcastMessage(self, message):
        print("broadcasting prepared message '{}' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to {}".format(client.peer))

class MatlabClient(jsonrpc.JSONRPC):
    factory = None
    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)
    factory.protocol = BroadcastServerProtocol
    listenWS(factory)
    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))
    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)
    reactor.run()

问题-失败的尝试

首先要注意的是,如果您在从Matlab中获取python时遇到问题,您需要确保使用pyversion命令指向系统上正确版本的python,并且可以使用pyversion('/path/to/python') 进行更正

Matlab无法运行反应堆

clear all
close all
i = uint32(1)
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.
    i = i + 1;
    toc;
end

Matlab POST

另一个尝试涉及使用Matlab的webwrite来POST到服务器。事实证明,webwrite只需传递正确的weboptions即可将数据转换为JSON。

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

这起到了作用,但每条消息的速度很慢(约0.1秒)。我应该提到的是,矩阵不是我发送的真实数据,真实数据串行化到每条消息大约280000字节,但这提供了一个合理的近似值。

我如何调用bc.sendMessage,使其正确地运行reactor或以另一种更快的方式解决此问题?

使用Python和Matlab设置WebSocket

检查Matlab是否指向正确版本的python

首先,您需要确保使用正确的python二进制文件。例如,在Mac上,您可能使用系统标准版本,而不是Homebrew安装的版本。使用检查您的python安装位置

pyversion

您可以使用将Matlab指向正确的版本

pyversion('path/to/python')

这可能需要重新启动python。

如上所述,我使用Twisted将我的Matlab数据多路传输到WebSocket客户端。我发现解决这个问题的最好方法就是创建一个处理POSTS的服务器,然后将其传递给WebSocket客户端。压缩只是减慢了速度,所以我每次请求发送280 kBytes的JSON,每条消息大约需要0.05秒。我希望速度更快,0.01秒,但这是一个好的开始。

Matlab代码

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

我本可以使用Matlab webwrite函数,但通常我发现调用python更灵活。

Python WebSocket WebClient服务器

import sys
from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File
from autobahn.twisted.websocket import WebSocketServerFactory, 
    WebSocketServerProtocol, 
    listenWS

class BroadcastServerProtocol(WebSocketServerProtocol):
    def onOpen(self):
        self.factory.registerClient(self)
    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)
    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)

class BroadcastServerFactory(WebSocketServerFactory):
    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []
    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)
    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)
    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))

class BroadcastPreparedServerFactory(BroadcastServerFactory):
    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)

class WebClient(Resource):
    webSocket = None
    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())
        return 'OK'

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)
    factory.protocol = BroadcastServerProtocol
    listenWS(factory)
    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)
    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)
    reactor.run()

我放弃了RPC尝试,只进行了一次直接的POST。仍然有很多提高绩效的机会。

相关内容

  • 没有找到相关文章

最新更新