Отправка данных JSON через WebSocket из Matlab с использованием Python Twisted и Autobahn

Я пытаюсь создать соединение из Matlab для потока кадров JSON через WebSocket. Я протестировал мою установку python для автозапуска и скрутил, используя следующее.

Рабочий пример

Код Matlab

Пример кода драйвера, который использует инструмент JSONlab для преобразования данных Matlab в форму JSON, а затем я compress и Base64 кодировать данные. Поскольку я не получил RPC для работы, я использую командную строку, где мне требуется сжатие и кодировка Base64, чтобы избежать проблем с расширением строки и оболочки.

clear all
close all

python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)

encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);

    i = i + 1;
    toc;
end

Код клиента трансляции

Клиентский код имеет два способа вызова. Вы можете передать свое сообщение через командную строку или создать экземпляр BroadcastClient и вызвать sendMessage.

#!/usr/bin/env python

import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy


class BroadcastClient():

    def __init__(self, server=None):
        self.proxy = Proxy(server)

    def errorMessage(self, value):
        print 'Error ', value

    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)


def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()

if __name__ == '__main__':
    main(sys.argv)

Код сервера широковещания

Сервер предоставляет RPC-клиент на 7080, веб-клиент на 8080 и WebSocket на 9080 с использованием TXJSONRPC, Twisted и Autobahn. Веб-клиент Autobahn полезен для отладки и должен быть помещен в тот же каталог, что и код сервера.

#!/usr/bin/env python

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        print("broadcasting message '{}' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to {}".format(client.peer))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """

    def broadcastMessage(self, message):
        print("broadcasting prepared message '{}' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to {}".format(client.peer))


class MatlabClient(jsonrpc.JSONRPC):

    factory = None

    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

Проблема - неудачные попытки

Во-первых, если у вас возникли проблемы с запуском python из Matlab, вам нужно убедиться, что вы указываете правильную версию Python в своей системе с помощью команды pyversion, и вы можете исправить ее, используя pyversion('/path/to/python')

Matlab не может запустить реактор

clear all
close all

i = uint32(1)

while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.

    i = i + 1;
    toc;
end

Matlab POST

Другая попытка заключалась в использовании Matlab webwrite для POST на сервере. Выключается webwrite будет преобразовывать данные в JSON, просто передав правильный weboptions.

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

Это сработало, но оказалось медленным (~ 0,1 секунды) за сообщение. Я должен упомянуть, что матрица не является реальными данными, которые я отправляю, реальные данные сериализуются примерно до 280000 байт на сообщение, но это обеспечивает разумное приближение.

Как я могу позвонить bc.sendMessage, чтобы он правильно справлялся с тем, чтобы запустить реактор или решить эту проблему другим, более быстрым способом?

Ответы

Ответ 1

Настройка WebSocket с использованием Python и Matlab

Проверить, что Matlab указывает на правильную версию python

Во-первых, вам нужно убедиться, что вы используете правильный бинарный код python. На Mac вы можете использовать стандартную версию системы вместо той, которая была установлена, например, для Homebrew. Проверьте местоположение вашей установки python с помощью:

pyversion

Вы можете указать Matlab на правильную версию, используя:

pyversion('path/to/python')

для этого может потребоваться перезагрузка python.

Как указано выше, я использую Twisted для мультиплексирования данных Matlab для клиентов WebSocket. Лучший способ решить эту проблему - просто создать сервер, который обрабатывает POSTS, а затем передает его вместе с клиентами WebSocket. Сжатие просто замедляет работу, поэтому я отправляю 280 Кбайт JSON за запрос, который принимает примерно 0,05 секунды за сообщение. Я бы хотел, чтобы это было быстрее, 0,01 секунды, но это хороший старт.

Код Matlab

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

Я мог бы использовать функцию Matlab webwrite, но обычно я нахожу, что вызов на python является более гибким.

Сервер Python WebSocket-WebClient

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)


class WebClient(Resource):

    webSocket = None

    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())

        return 'OK'


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

Я избавился от попытки RPC и просто пошел с прямым POST. Еще много возможностей для повышения производительности.