我正在实现 ZMQ 的 Espresso 模式。
我想连接很多订阅者<>代理<>很多发布者
但是,代理中的监听器只接收来自一个发布者的消息。因此,订阅者只能从那个特定的发布者那里接收。 我无法弄清楚我的代码有什么问题。
package playground
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"math/rand"
"time"
"testing"
)
func subscriber_thread(id int) {
subscriber, _ := zmq.NewSocket(zmq.SUB)
subscriber.Connect("tcp://localhost:6001")
subscriber.SetSubscribe("")
defer subscriber.Close()
for {
msg, err := subscriber.RecvMessage(0)
if err != nil {
panic(err)
}
fmt.Println("subscriber id:", id,"received:", msg)
}
}
func publisher_thread(n int) {
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:6000")
for {
s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
_, err := publisher.SendMessage(s)
if err != nil {
panic(err)
}
fmt.Println("publisher sent:", s)
time.Sleep(100 * time.Millisecond) // Wait for 1/10th second
}
}
// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:
func listener_thread() {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Bind("inproc://pipe")
// Print everything that arrives on pipe
for {
msg, err := pipe.RecvMessage(0)
if err != nil {
break // Interrupted
}
fmt.Printf("%q\n", msg)
}
}
func TestZmqEspresso(t *testing.T) {
go publisher_thread(0)
go publisher_thread(1)
go publisher_thread(2)
go subscriber_thread(1)
go subscriber_thread(2)
go listener_thread()
time.Sleep(100 * time.Millisecond)
subscriber, _ := zmq.NewSocket(zmq.XSUB)
subscriber.Connect("tcp://localhost:6000")
publisher, _ := zmq.NewSocket(zmq.XPUB)
publisher.Bind("tcp://*:6001")
listener, _ := zmq.NewSocket(zmq.PAIR)
listener.Connect("inproc://pipe")
zmq.Proxy(subscriber, publisher, listener)
fmt.Println("interrupted")
}
最佳答案
我找到了解决方案。 XPUB/XSUB 应该绑定(bind)到套接字 PUB 和 SUB worker 应该连接到套接字
下面的工作代码
package playground
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"log"
"math/rand"
"testing"
"time"
)
func subscriber_thread(id int) {
subscriber, err := zmq.NewSocket(zmq.SUB)
if err != nil {
panic(err)
}
err = subscriber.Connect("tcp://localhost:6001")
if err != nil {
panic(err)
}
err = subscriber.SetSubscribe("")
if err != nil {
panic(err)
}
defer subscriber.Close()
for {
msg, err := subscriber.RecvMessage(0)
if err != nil {
panic(err)
}
fmt.Println("subscriber id:", id, "received:", msg)
}
}
func publisher_thread(n int) {
publisher, err := zmq.NewSocket(zmq.PUB)
if err != nil {
panic(err)
}
//err = publisher.Bind("tcp://*:6000")
err = publisher.Connect("tcp://localhost:6000")
if err != nil {
panic(err)
}
for {
s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
_, err := publisher.SendMessage(s)
if err != nil {
panic(err)
}
fmt.Println("publisher sent:", s)
time.Sleep(100 * time.Millisecond) // Wait for 1/10th second
}
}
// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:
func listener_thread() {
pipe, _ := zmq.NewSocket(zmq.PAIR)
pipe.Bind("inproc://pipe")
// Print everything that arrives on pipe
for {
msg, err := pipe.RecvMessage(0)
if err != nil {
break // Interrupted
}
fmt.Printf("%q\n", msg)
}
}
func TestZmqEspresso(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
go publisher_thread(0)
go publisher_thread(1)
go publisher_thread(2)
go subscriber_thread(1)
go subscriber_thread(2)
go listener_thread()
time.Sleep(100 * time.Millisecond)
subscriber, err := zmq.NewSocket(zmq.XSUB)
if err != nil {
panic(err)
}
//err = subscriber.Connect("tcp://localhost:6000")
err = subscriber.Bind("tcp://*:6000")
if err != nil {
panic(err)
}
publisher, err := zmq.NewSocket(zmq.XPUB)
if err != nil {
panic(err)
}
err = publisher.Bind("tcp://*:6001")
if err != nil {
panic(err)
}
listener, _ := zmq.NewSocket(zmq.PAIR)
listener.Connect("inproc://pipe")
err = zmq.Proxy(subscriber, publisher, listener)
if err != nil {
panic(err)
}
fmt.Println("interrupted")
}
关于go - ZMQ 无法接收来自多个发布者的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53973010/
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
我有多个ActiveRecord子类Item的实例数组,我需要根据最早的事件循环打印。在这种情况下,我需要打印付款和维护日期,如下所示:ItemAmaintenancerequiredin5daysItemBpaymentrequiredin6daysItemApaymentrequiredin7daysItemBmaintenancerequiredin8days我目前有两个查询,用于查找maintenance和payment项目(非排他性查询),并输出如下内容:paymentrequiredin...maintenancerequiredin...有什么方法可以改善上述(丑陋的)代
我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何
我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
我正在尝试在我的centos服务器上安装therubyracer,但遇到了麻烦。$geminstalltherubyracerBuildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingtherubyracer:ERROR:Failedtobuildgemnativeextension./usr/local/rvm/rubies/ruby-1.9.3-p125/bin/rubyextconf.rbcheckingformain()in-lpthread...yescheckingforv8.h...no***e
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
我有一个具有一些属性的模型:attr1、attr2和attr3。我需要在不执行回调和验证的情况下更新此属性。我找到了update_column方法,但我想同时更新三个属性。我需要这样的东西:update_columns({attr1:val1,attr2:val2,attr3:val3})代替update_column(attr1,val1)update_column(attr2,val2)update_column(attr3,val3) 最佳答案 您可以使用update_columns(attr1:val1,attr2:val2
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这