草庐IT

python - 使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据

coder 2023-08-14 原文

我正在尝试从 Matlab 创建连接以通过 WebSocket 流式传输 JSON 帧。我已经测试了高速公路的 python 安装并使用以下命令进行了扭曲。

工作示例

Matlab代码

使用 JSONlab 的示例驱动程序代码工具箱将 Matlab 数据转换为 JSON 格式,然后我 compressBase64对数据进行编码。由于我还没有让 RPC 工作,所以我在需要压缩和 Base64 编码的地方使用命令行来避免行长和 shell 转义问题。

clear all
close all

python = '/usr/local/bin/python'
bc = '/Users/palmerc/broadcast_client.py'
i = uint32(1)

encoder = org.apache.commons.codec.binary.Base64
while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    message = sprintf('%s %s %s', python, bc, b64);
    status = system(message);

    i = i + 1;
    toc;
end

广播客户端代码

客户端代码有两种调用方式。您可以通过命令行传递消息或创建 BroadcastClient 实例并调用 sendMessage。

#!/usr/bin/env python

import sys
from twisted.internet import reactor
from txjsonrpc.web.jsonrpc import Proxy


class BroadcastClient():

    def __init__(self, server=None):
        self.proxy = Proxy(server)

    def errorMessage(self, value):
        print 'Error ', value

    def sendMessage(self, message):
        rc = self.proxy.callRemote('broadcastMessage', message).addCallback(lambda _: reactor.stop())
        rc.addErrback(self.errorMessage)


def main(cli_arguments):
    if len(cli_arguments) > 1:
        message = cli_arguments[1]
        broadcastClient = BroadcastClient('http://127.0.0.1:7080/')
        broadcastClient.sendMessage(message)
        reactor.run()

if __name__ == '__main__':
    main(sys.argv)

广播服务器代码

服务器在 7080 上提供 RPC 客户端,在 8080 上提供 Web 客户端,在 9080 上使用 TXJSONRPC、Twisted 和 Autobahn 提供 WebSocket。 Autobahn Web Client用于调试,应与服务器代码放在同一目录中。

#!/usr/bin/env python

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.server import Site
from twisted.web.static import File
from txjsonrpc.web import jsonrpc

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    """
    Simple broadcast server broadcasting any message it receives to all
    currently connected clients.
    """

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        print("broadcasting message '{}' ..".format(message))
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))
            print("message sent to {}".format(client.peer))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    """
    Functionally same as above, but optimized broadcast using
    prepareMessage and sendPreparedMessage.
    """

    def broadcastMessage(self, message):
        print("broadcasting prepared message '{}' ..".format(message))
        preparedMessage = self.prepareMessage(message.encode('utf8'), isBinary=False)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)
            print("prepared message sent to {}".format(client.peer))


class MatlabClient(jsonrpc.JSONRPC):

    factory = None

    def jsonrpc_broadcastMessage(self, message):
        if self.factory is not None:
            print self.factory.broadcastMessage(message)


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    matlab = MatlabClient()
    matlab.factory = factory
    reactor.listenTCP(7080, Site(matlab))

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

问题 - 失败的尝试

首先请注意,如果您在使用 Matlab 运行 python 时遇到问题,您需要确保使用 pyversion 命令指向系统上正确的 Python 版本,然后您可以更正它使用 pyversion('/path/to/python')

Matlab 无法运行 reactor

clear all
close all

i = uint32(1)

while true
    tic;
    packet = rand(100, 100);
    json_packet = uint8(savejson('', packet));
    compressed = CompressLib.compress(json_packet);
    b64 = char(encoder.encode(compressed));
    bc.sendMessage(py.str(b64.'));
    py.twisted.internet.reactor.run % This won't work.

    i = i + 1;
    toc;
end

Matlab 发布

另一种尝试涉及使用 Matlab 的 webwrite 来 POST 到服务器。事实证明,webwrite 只需传递正确的 weboptions 即可将数据转换为 JSON。

options = weboptions('MediaType', 'application/json');
data = struct('Matrix', rand(100, 100));
webwrite(server, data, options);

这行得通,但事实证明每条消息的速度很慢(~0.1 秒)。我应该提到矩阵不是我发送的真实数据,真实数据序列化为每条消息大约 280000 字节,但这提供了一个合理的近似值。

我如何调用 bc.sendMessage 以便它正确地设法让 react 堆运行或以另一种更快的方式解决这个问题?

最佳答案

使用 Python 和 Matlab 设置 WebSocket

检查 Matlab 是否指向正确的 python 版本

首先,您需要确保使用的是正确的 python 二进制文件。在 Mac 上,您可能使用的是系统标准版本,而不是 Homebrew 安装的版本。使用以下命令检查 python 安装的位置:

pyversion

您可以使用以下方法将 Matlab 指向正确的版本:

pyversion('path/to/python')

这可能需要您重新启动 python。

如上所述,我正在使用 Twisted 将我的 Matlab 数据多路传输到 WebSocket 客户端。我发现解决此问题的最佳方法是简单地创建一个处理 POSTS 的服务器,然后将其传递给 WebSocket 客户端。压缩只会减慢速度,所以我为每个请求发送 280 kBytes 的 JSON,每条消息大约需要 0.05 秒。我希望它更快,0.01 秒,但这是一个好的开始。

Matlab代码

server = 'http://127.0.0.1:7080/update.json';
headers = py.dict(pyargs('Charset','UTF-8','Content-Type','application/json'));
while true
    tic;
    packet = rand(100, 100);
    json_packet = savejson('', packet);
    r = py.requests.post(server, pyargs('data', json_packet, 'headers', headers));
    toc;
end

我本可以使用 Matlab webwrite 函数,但通常我发现调用 python 更加灵活。

Python WebSocket-WebClient服务端

import sys

from twisted.internet import reactor
from twisted.python import log
from twisted.web.resource import Resource
from twisted.web.server import Site
from twisted.web.static import File

from autobahn.twisted.websocket import WebSocketServerFactory, \
    WebSocketServerProtocol, \
    listenWS


class BroadcastServerProtocol(WebSocketServerProtocol):

    def onOpen(self):
        self.factory.registerClient(self)

    def onMessage(self, payload, isBinary):
        if not isBinary:
            message = "{} from {}".format(payload.decode('utf8'), self.peer)
            self.factory.broadcastMessage(message)

    def connectionLost(self, reason):
        WebSocketServerProtocol.connectionLost(self, reason)
        self.factory.unregisterClient(self)


class BroadcastServerFactory(WebSocketServerFactory):

    def __init__(self, url, debug=False, debugCodePaths=False):
        WebSocketServerFactory.__init__(self, url, debug=debug, debugCodePaths=debugCodePaths)
        self.clients = []

    def registerClient(self, client):
        if client not in self.clients:
            print("registered client {}".format(client.peer))
            self.clients.append(client)

    def unregisterClient(self, client):
        if client in self.clients:
            print("unregistered client {}".format(client.peer))
            self.clients.remove(client)

    def broadcastMessage(self, message):
        for client in self.clients:
            client.sendMessage(message.encode('utf8'))


class BroadcastPreparedServerFactory(BroadcastServerFactory):

    def broadcastMessage(self, message, isBinary=False):
        if isBinary is True:
            message = message.encode('utf8')
        preparedMessage = self.prepareMessage(message, isBinary=isBinary)
        for client in self.clients:
            client.sendPreparedMessage(preparedMessage)


class WebClient(Resource):

    webSocket = None

    def render_POST(self, request):
        self.webSocket.broadcastMessage(request.content.read())

        return 'OK'


if __name__ == '__main__':

    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False
    factory = BroadcastPreparedServerFactory(u"ws://127.0.0.1:9000",
                                             debug=debug,
                                             debugCodePaths=debug)

    factory.protocol = BroadcastServerProtocol
    listenWS(factory)

    root = Resource()
    webClient = WebClient()
    webClient.webSocket = factory
    root.putChild('update.json', webClient)
    webFactory = Site(root)
    reactor.listenTCP(7080, webFactory)

    webdir = File(".")
    web = Site(webdir)
    reactor.listenTCP(8080, web)

    reactor.run()

我摆脱了 RPC 尝试,直接使用 POST。仍然有很多提高性能的机会。

关于python - 使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34357973/

有关python - 使用 Python Twisted 和 Autobahn 从 Matlab 通过 WebSocket 发送 JSON 数据的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐