草庐IT

go - ZMQ 无法接收来自多个发布者的消息

coder 2024-07-09 原文

我正在实现 ZMQ 的 Espresso 模式。

我想连接很多订阅者<>代理<>很多发布者

但是,代理中的监听器只接收来自一个发布者的消息。因此,订阅者只能从那个特定的发布者那里接收。 我无法弄清楚我的代码有什么问题。

package playground

import (
    zmq "github.com/pebbe/zmq4"

    "fmt"
    "math/rand"
    "time"
    "testing"
)

func subscriber_thread(id int) {
    subscriber, _ := zmq.NewSocket(zmq.SUB)
    subscriber.Connect("tcp://localhost:6001")
    subscriber.SetSubscribe("")
    defer subscriber.Close()

    for {
        msg, err := subscriber.RecvMessage(0)
        if err != nil {
            panic(err)
        }
        fmt.Println("subscriber id:", id,"received:", msg)
    }
}

func publisher_thread(n int) {
    publisher, _ := zmq.NewSocket(zmq.PUB)
    publisher.Bind("tcp://*:6000")

    for {
        s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
        _, err := publisher.SendMessage(s)
        if err != nil {
            panic(err)
        }
        fmt.Println("publisher sent:", s)
        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewSocket(zmq.PAIR)
    pipe.Bind("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, err := pipe.RecvMessage(0)
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q\n", msg)
    }
}

func TestZmqEspresso(t *testing.T) {
    go publisher_thread(0)
    go publisher_thread(1)
    go publisher_thread(2)

    go subscriber_thread(1)
    go subscriber_thread(2)

    go listener_thread()

    time.Sleep(100 * time.Millisecond)

    subscriber, _ := zmq.NewSocket(zmq.XSUB)
    subscriber.Connect("tcp://localhost:6000")

    publisher, _ := zmq.NewSocket(zmq.XPUB)
    publisher.Bind("tcp://*:6001")

    listener, _ := zmq.NewSocket(zmq.PAIR)
    listener.Connect("inproc://pipe")

    zmq.Proxy(subscriber, publisher, listener)

    fmt.Println("interrupted")

}

最佳答案

我找到了解决方案。 XPUB/XSUB 应该绑定(bind)到套接字 PUB 和 SUB worker 应该连接到套接字

下面的工作代码

package playground

import (
    zmq "github.com/pebbe/zmq4"

    "fmt"
    "log"
    "math/rand"
    "testing"
    "time"
)

func subscriber_thread(id int) {
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        panic(err)
    }
    err = subscriber.Connect("tcp://localhost:6001")
    if err != nil {
        panic(err)
    }
    err = subscriber.SetSubscribe("")
    if err != nil {
        panic(err)
    }
    defer subscriber.Close()

    for {
        msg, err := subscriber.RecvMessage(0)
        if err != nil {
            panic(err)
        }
        fmt.Println("subscriber id:", id, "received:", msg)
    }
}

func publisher_thread(n int) {
    publisher, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        panic(err)
    }
    //err = publisher.Bind("tcp://*:6000")
    err = publisher.Connect("tcp://localhost:6000")
    if err != nil {
        panic(err)
    }

    for {
        s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
        _, err := publisher.SendMessage(s)
        if err != nil {
            panic(err)
        }
        fmt.Println("publisher sent:", s)
        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewSocket(zmq.PAIR)
    pipe.Bind("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, err := pipe.RecvMessage(0)
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q\n", msg)
    }
}

func TestZmqEspresso(t *testing.T) {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)

    go publisher_thread(0)
    go publisher_thread(1)
    go publisher_thread(2)

    go subscriber_thread(1)
    go subscriber_thread(2)

    go listener_thread()

    time.Sleep(100 * time.Millisecond)

    subscriber, err := zmq.NewSocket(zmq.XSUB)
    if err != nil {
        panic(err)
    }
    //err = subscriber.Connect("tcp://localhost:6000")
    err = subscriber.Bind("tcp://*:6000")
    if err != nil {
        panic(err)
    }

    publisher, err := zmq.NewSocket(zmq.XPUB)
    if err != nil {
        panic(err)
    }
    err = publisher.Bind("tcp://*:6001")
    if err != nil {
        panic(err)
    }

    listener, _ := zmq.NewSocket(zmq.PAIR)
    listener.Connect("inproc://pipe")

    err = zmq.Proxy(subscriber, publisher, listener)
    if err != nil {
        panic(err)
    }

    fmt.Println("interrupted")

}

关于go - ZMQ 无法接收来自多个发布者的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53973010/

有关go - ZMQ 无法接收来自多个发布者的消息的更多相关文章

  1. ruby-on-rails - 由于 "wkhtmltopdf",PDFKIT 显然无法正常工作 - 2

    我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-

  2. ruby-on-rails - Rails 3 中的多个路由文件 - 2

    Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题

  3. ruby-on-rails - 在 Ruby 中循环遍历多个数组 - 2

    我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代

  4. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  5. ruby-on-rails - 无法使用 Rails 3.2 创建插件? - 2

    我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby​​1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在

  6. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  7. ruby-on-rails - 无法在centos上安装therubyracer(V8和GCC出错) - 2

    我正在尝试在我的centos服务器上安装therubyracer,但遇到了麻烦。$geminstalltherubyracerBuildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingtherubyracer:ERROR:Failedtobuildgemnativeextension./usr/local/rvm/rubies/ruby-1.9.3-p125/bin/rubyextconf.rbcheckingformain()in-lpthread...yescheckingforv8.h...no***e

  8. ruby - 无法让 RSpec 工作—— 'require' : cannot load such file - 2

    我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳

  9. ruby - 多个属性的 update_column 方法 - 2

    我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2

  10. ruby-on-rails - 在 ruby​​ .gemspec 文件中,如何指定依赖项的多个版本? - 2

    我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这

随机推荐